iceberg/spec/
values.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * Value in iceberg
20 */
21
22use std::any::Any;
23use std::cmp::Ordering;
24use std::collections::HashMap;
25use std::fmt::{Display, Formatter};
26use std::hash::Hash;
27use std::ops::Index;
28use std::str::FromStr;
29
30pub use _serde::RawLiteral;
31use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
32use num_bigint::BigInt;
33use ordered_float::{Float, OrderedFloat};
34use rust_decimal::Decimal;
35use rust_decimal::prelude::ToPrimitive;
36use serde::de::{
37    MapAccess, {self},
38};
39use serde::ser::SerializeStruct;
40use serde::{Deserialize, Serialize};
41use serde_bytes::ByteBuf;
42use serde_json::{Map as JsonMap, Number, Value as JsonValue};
43use timestamp::nanoseconds_to_datetime;
44use uuid::Uuid;
45
46use super::datatypes::{PrimitiveType, Type};
47use crate::error::Result;
48use crate::spec::MAX_DECIMAL_PRECISION;
49use crate::spec::values::date::{date_from_naive_date, days_to_date, unix_epoch};
50use crate::spec::values::time::microseconds_to_time;
51use crate::spec::values::timestamp::microseconds_to_datetime;
52use crate::spec::values::timestamptz::{microseconds_to_datetimetz, nanoseconds_to_datetimetz};
53use crate::{Error, ErrorKind, ensure_data_valid};
54
55/// Maximum value for [`PrimitiveType::Time`] type in microseconds, e.g. 23 hours 59 minutes 59 seconds 999999 microseconds.
56const MAX_TIME_VALUE: i64 = 24 * 60 * 60 * 1_000_000i64 - 1;
57
58const INT_MAX: i32 = 2147483647;
59const INT_MIN: i32 = -2147483648;
60const LONG_MAX: i64 = 9223372036854775807;
61const LONG_MIN: i64 = -9223372036854775808;
62
63/// Values present in iceberg type
64#[derive(Clone, Debug, PartialOrd, PartialEq, Hash, Eq)]
65pub enum PrimitiveLiteral {
66    /// 0x00 for false, non-zero byte for true
67    Boolean(bool),
68    /// Stored as 4-byte little-endian
69    Int(i32),
70    /// Stored as 8-byte little-endian
71    Long(i64),
72    /// Stored as 4-byte little-endian
73    Float(OrderedFloat<f32>),
74    /// Stored as 8-byte little-endian
75    Double(OrderedFloat<f64>),
76    /// UTF-8 bytes (without length)
77    String(String),
78    /// Binary value (without length)
79    Binary(Vec<u8>),
80    /// Stored as 16-byte little-endian
81    Int128(i128),
82    /// Stored as 16-byte little-endian
83    UInt128(u128),
84    /// When a number is larger than it can hold
85    AboveMax,
86    /// When a number is smaller than it can hold
87    BelowMin,
88}
89
90impl PrimitiveLiteral {
91    /// Returns true if the Literal represents a primitive type
92    /// that can be a NaN, and that it's value is NaN
93    pub fn is_nan(&self) -> bool {
94        match self {
95            PrimitiveLiteral::Double(val) => val.is_nan(),
96            PrimitiveLiteral::Float(val) => val.is_nan(),
97            _ => false,
98        }
99    }
100}
101
102/// Literal associated with its type. The value and type pair is checked when construction, so the type and value is
103/// guaranteed to be correct when used.
104///
105/// By default, we decouple the type and value of a literal, so we can use avoid the cost of storing extra type info
106/// for each literal. But associate type with literal can be useful in some cases, for example, in unbound expression.
107#[derive(Clone, Debug, PartialEq, Hash, Eq)]
108pub struct Datum {
109    r#type: PrimitiveType,
110    literal: PrimitiveLiteral,
111}
112
113impl Serialize for Datum {
114    fn serialize<S: serde::Serializer>(
115        &self,
116        serializer: S,
117    ) -> std::result::Result<S::Ok, S::Error> {
118        let mut struct_ser = serializer
119            .serialize_struct("Datum", 2)
120            .map_err(serde::ser::Error::custom)?;
121        struct_ser
122            .serialize_field("type", &self.r#type)
123            .map_err(serde::ser::Error::custom)?;
124        struct_ser
125            .serialize_field(
126                "literal",
127                &RawLiteral::try_from(
128                    Literal::Primitive(self.literal.clone()),
129                    &Type::Primitive(self.r#type.clone()),
130                )
131                .map_err(serde::ser::Error::custom)?,
132            )
133            .map_err(serde::ser::Error::custom)?;
134        struct_ser.end()
135    }
136}
137
138impl<'de> Deserialize<'de> for Datum {
139    fn deserialize<D: serde::Deserializer<'de>>(
140        deserializer: D,
141    ) -> std::result::Result<Self, D::Error> {
142        #[derive(Deserialize)]
143        #[serde(field_identifier, rename_all = "lowercase")]
144        enum Field {
145            Type,
146            Literal,
147        }
148
149        struct DatumVisitor;
150
151        impl<'de> serde::de::Visitor<'de> for DatumVisitor {
152            type Value = Datum;
153
154            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
155                formatter.write_str("struct Datum")
156            }
157
158            fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
159            where A: serde::de::SeqAccess<'de> {
160                let r#type = seq
161                    .next_element::<PrimitiveType>()?
162                    .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
163                let value = seq
164                    .next_element::<RawLiteral>()?
165                    .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
166                let Literal::Primitive(primitive) = value
167                    .try_into(&Type::Primitive(r#type.clone()))
168                    .map_err(serde::de::Error::custom)?
169                    .ok_or_else(|| serde::de::Error::custom("None value"))?
170                else {
171                    return Err(serde::de::Error::custom("Invalid value"));
172                };
173
174                Ok(Datum::new(r#type, primitive))
175            }
176
177            fn visit_map<V>(self, mut map: V) -> std::result::Result<Datum, V::Error>
178            where V: MapAccess<'de> {
179                let mut raw_primitive: Option<RawLiteral> = None;
180                let mut r#type: Option<PrimitiveType> = None;
181                while let Some(key) = map.next_key()? {
182                    match key {
183                        Field::Type => {
184                            if r#type.is_some() {
185                                return Err(de::Error::duplicate_field("type"));
186                            }
187                            r#type = Some(map.next_value()?);
188                        }
189                        Field::Literal => {
190                            if raw_primitive.is_some() {
191                                return Err(de::Error::duplicate_field("literal"));
192                            }
193                            raw_primitive = Some(map.next_value()?);
194                        }
195                    }
196                }
197                let Some(r#type) = r#type else {
198                    return Err(serde::de::Error::missing_field("type"));
199                };
200                let Some(raw_primitive) = raw_primitive else {
201                    return Err(serde::de::Error::missing_field("literal"));
202                };
203                let Literal::Primitive(primitive) = raw_primitive
204                    .try_into(&Type::Primitive(r#type.clone()))
205                    .map_err(serde::de::Error::custom)?
206                    .ok_or_else(|| serde::de::Error::custom("None value"))?
207                else {
208                    return Err(serde::de::Error::custom("Invalid value"));
209                };
210                Ok(Datum::new(r#type, primitive))
211            }
212        }
213        const FIELDS: &[&str] = &["type", "literal"];
214        deserializer.deserialize_struct("Datum", FIELDS, DatumVisitor)
215    }
216}
217
218// Compare following iceberg float ordering rules:
219//  -NaN < -Infinity < -value < -0 < 0 < value < Infinity < NaN
220fn iceberg_float_cmp<T: Float>(a: T, b: T) -> Option<Ordering> {
221    if a.is_nan() && b.is_nan() {
222        return match (a.is_sign_negative(), b.is_sign_negative()) {
223            (true, false) => Some(Ordering::Less),
224            (false, true) => Some(Ordering::Greater),
225            _ => Some(Ordering::Equal),
226        };
227    }
228
229    if a.is_nan() {
230        return Some(if a.is_sign_negative() {
231            Ordering::Less
232        } else {
233            Ordering::Greater
234        });
235    }
236
237    if b.is_nan() {
238        return Some(if b.is_sign_negative() {
239            Ordering::Greater
240        } else {
241            Ordering::Less
242        });
243    }
244
245    a.partial_cmp(&b)
246}
247
248impl PartialOrd for Datum {
249    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
250        match (&self.literal, &other.literal, &self.r#type, &other.r#type) {
251            // generate the arm with same type and same literal
252            (
253                PrimitiveLiteral::Boolean(val),
254                PrimitiveLiteral::Boolean(other_val),
255                PrimitiveType::Boolean,
256                PrimitiveType::Boolean,
257            ) => val.partial_cmp(other_val),
258            (
259                PrimitiveLiteral::Int(val),
260                PrimitiveLiteral::Int(other_val),
261                PrimitiveType::Int,
262                PrimitiveType::Int,
263            ) => val.partial_cmp(other_val),
264            (
265                PrimitiveLiteral::Long(val),
266                PrimitiveLiteral::Long(other_val),
267                PrimitiveType::Long,
268                PrimitiveType::Long,
269            ) => val.partial_cmp(other_val),
270            (
271                PrimitiveLiteral::Float(val),
272                PrimitiveLiteral::Float(other_val),
273                PrimitiveType::Float,
274                PrimitiveType::Float,
275            ) => iceberg_float_cmp(*val, *other_val),
276            (
277                PrimitiveLiteral::Double(val),
278                PrimitiveLiteral::Double(other_val),
279                PrimitiveType::Double,
280                PrimitiveType::Double,
281            ) => iceberg_float_cmp(*val, *other_val),
282            (
283                PrimitiveLiteral::Int(val),
284                PrimitiveLiteral::Int(other_val),
285                PrimitiveType::Date,
286                PrimitiveType::Date,
287            ) => val.partial_cmp(other_val),
288            (
289                PrimitiveLiteral::Long(val),
290                PrimitiveLiteral::Long(other_val),
291                PrimitiveType::Time,
292                PrimitiveType::Time,
293            ) => val.partial_cmp(other_val),
294            (
295                PrimitiveLiteral::Long(val),
296                PrimitiveLiteral::Long(other_val),
297                PrimitiveType::Timestamp,
298                PrimitiveType::Timestamp,
299            ) => val.partial_cmp(other_val),
300            (
301                PrimitiveLiteral::Long(val),
302                PrimitiveLiteral::Long(other_val),
303                PrimitiveType::Timestamptz,
304                PrimitiveType::Timestamptz,
305            ) => val.partial_cmp(other_val),
306            (
307                PrimitiveLiteral::String(val),
308                PrimitiveLiteral::String(other_val),
309                PrimitiveType::String,
310                PrimitiveType::String,
311            ) => val.partial_cmp(other_val),
312            (
313                PrimitiveLiteral::UInt128(val),
314                PrimitiveLiteral::UInt128(other_val),
315                PrimitiveType::Uuid,
316                PrimitiveType::Uuid,
317            ) => Uuid::from_u128(*val).partial_cmp(&Uuid::from_u128(*other_val)),
318            (
319                PrimitiveLiteral::Binary(val),
320                PrimitiveLiteral::Binary(other_val),
321                PrimitiveType::Fixed(_),
322                PrimitiveType::Fixed(_),
323            ) => val.partial_cmp(other_val),
324            (
325                PrimitiveLiteral::Binary(val),
326                PrimitiveLiteral::Binary(other_val),
327                PrimitiveType::Binary,
328                PrimitiveType::Binary,
329            ) => val.partial_cmp(other_val),
330            (
331                PrimitiveLiteral::Int128(val),
332                PrimitiveLiteral::Int128(other_val),
333                PrimitiveType::Decimal {
334                    precision: _,
335                    scale,
336                },
337                PrimitiveType::Decimal {
338                    precision: _,
339                    scale: other_scale,
340                },
341            ) => {
342                let val = Decimal::from_i128_with_scale(*val, *scale);
343                let other_val = Decimal::from_i128_with_scale(*other_val, *other_scale);
344                val.partial_cmp(&other_val)
345            }
346            _ => None,
347        }
348    }
349}
350
351impl Display for Datum {
352    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
353        match (&self.r#type, &self.literal) {
354            (_, PrimitiveLiteral::Boolean(val)) => write!(f, "{}", val),
355            (PrimitiveType::Int, PrimitiveLiteral::Int(val)) => write!(f, "{}", val),
356            (PrimitiveType::Long, PrimitiveLiteral::Long(val)) => write!(f, "{}", val),
357            (_, PrimitiveLiteral::Float(val)) => write!(f, "{}", val),
358            (_, PrimitiveLiteral::Double(val)) => write!(f, "{}", val),
359            (PrimitiveType::Date, PrimitiveLiteral::Int(val)) => {
360                write!(f, "{}", days_to_date(*val))
361            }
362            (PrimitiveType::Time, PrimitiveLiteral::Long(val)) => {
363                write!(f, "{}", microseconds_to_time(*val))
364            }
365            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(val)) => {
366                write!(f, "{}", microseconds_to_datetime(*val))
367            }
368            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(val)) => {
369                write!(f, "{}", microseconds_to_datetimetz(*val))
370            }
371            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(val)) => {
372                write!(f, "{}", nanoseconds_to_datetime(*val))
373            }
374            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(val)) => {
375                write!(f, "{}", nanoseconds_to_datetimetz(*val))
376            }
377            (_, PrimitiveLiteral::String(val)) => write!(f, r#""{}""#, val),
378            (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(val)) => {
379                write!(f, "{}", Uuid::from_u128(*val))
380            }
381            (_, PrimitiveLiteral::Binary(val)) => display_bytes(val, f),
382            (
383                PrimitiveType::Decimal {
384                    precision: _,
385                    scale,
386                },
387                PrimitiveLiteral::Int128(val),
388            ) => {
389                write!(f, "{}", Decimal::from_i128_with_scale(*val, *scale))
390            }
391            (_, _) => {
392                unreachable!()
393            }
394        }
395    }
396}
397
398fn display_bytes(bytes: &[u8], f: &mut Formatter<'_>) -> std::fmt::Result {
399    let mut s = String::with_capacity(bytes.len() * 2);
400    for b in bytes {
401        s.push_str(&format!("{:02X}", b));
402    }
403    f.write_str(&s)
404}
405
406impl From<Datum> for Literal {
407    fn from(value: Datum) -> Self {
408        Literal::Primitive(value.literal)
409    }
410}
411
412impl From<Datum> for PrimitiveLiteral {
413    fn from(value: Datum) -> Self {
414        value.literal
415    }
416}
417
418impl Datum {
419    /// Creates a `Datum` from a `PrimitiveType` and a `PrimitiveLiteral`
420    pub(crate) fn new(r#type: PrimitiveType, literal: PrimitiveLiteral) -> Self {
421        Datum { r#type, literal }
422    }
423
424    /// Create iceberg value from bytes.
425    ///
426    /// See [this spec](https://iceberg.apache.org/spec/#binary-single-value-serialization) for reference.
427    pub fn try_from_bytes(bytes: &[u8], data_type: PrimitiveType) -> Result<Self> {
428        let literal = match data_type {
429            PrimitiveType::Boolean => {
430                if bytes.len() == 1 && bytes[0] == 0u8 {
431                    PrimitiveLiteral::Boolean(false)
432                } else {
433                    PrimitiveLiteral::Boolean(true)
434                }
435            }
436            PrimitiveType::Int => PrimitiveLiteral::Int(i32::from_le_bytes(bytes.try_into()?)),
437            PrimitiveType::Long => {
438                if bytes.len() == 4 {
439                    // In the case of an evolved field
440                    PrimitiveLiteral::Long(i32::from_le_bytes(bytes.try_into()?) as i64)
441                } else {
442                    PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
443                }
444            }
445            PrimitiveType::Float => {
446                PrimitiveLiteral::Float(OrderedFloat(f32::from_le_bytes(bytes.try_into()?)))
447            }
448            PrimitiveType::Double => {
449                if bytes.len() == 4 {
450                    // In the case of an evolved field
451                    PrimitiveLiteral::Double(OrderedFloat(
452                        f32::from_le_bytes(bytes.try_into()?) as f64
453                    ))
454                } else {
455                    PrimitiveLiteral::Double(OrderedFloat(f64::from_le_bytes(bytes.try_into()?)))
456                }
457            }
458            PrimitiveType::Date => PrimitiveLiteral::Int(i32::from_le_bytes(bytes.try_into()?)),
459            PrimitiveType::Time => PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?)),
460            PrimitiveType::Timestamp => {
461                PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
462            }
463            PrimitiveType::Timestamptz => {
464                PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
465            }
466            PrimitiveType::TimestampNs => {
467                PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
468            }
469            PrimitiveType::TimestamptzNs => {
470                PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
471            }
472            PrimitiveType::String => {
473                PrimitiveLiteral::String(std::str::from_utf8(bytes)?.to_string())
474            }
475            PrimitiveType::Uuid => {
476                PrimitiveLiteral::UInt128(u128::from_be_bytes(bytes.try_into()?))
477            }
478            PrimitiveType::Fixed(_) => PrimitiveLiteral::Binary(Vec::from(bytes)),
479            PrimitiveType::Binary => PrimitiveLiteral::Binary(Vec::from(bytes)),
480            PrimitiveType::Decimal { .. } => {
481                let unscaled_value = BigInt::from_signed_bytes_be(bytes);
482                PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
483                    Error::new(
484                        ErrorKind::DataInvalid,
485                        format!("Can't convert bytes to i128: {:?}", bytes),
486                    )
487                })?)
488            }
489        };
490        Ok(Datum::new(data_type, literal))
491    }
492
493    /// Convert the value to bytes
494    ///
495    /// See [this spec](https://iceberg.apache.org/spec/#binary-single-value-serialization) for reference.
496    pub fn to_bytes(&self) -> Result<ByteBuf> {
497        let buf = match &self.literal {
498            PrimitiveLiteral::Boolean(val) => {
499                if *val {
500                    ByteBuf::from([1u8])
501                } else {
502                    ByteBuf::from([0u8])
503                }
504            }
505            PrimitiveLiteral::Int(val) => ByteBuf::from(val.to_le_bytes()),
506            PrimitiveLiteral::Long(val) => ByteBuf::from(val.to_le_bytes()),
507            PrimitiveLiteral::Float(val) => ByteBuf::from(val.to_le_bytes()),
508            PrimitiveLiteral::Double(val) => ByteBuf::from(val.to_le_bytes()),
509            PrimitiveLiteral::String(val) => ByteBuf::from(val.as_bytes()),
510            PrimitiveLiteral::UInt128(val) => ByteBuf::from(val.to_be_bytes()),
511            PrimitiveLiteral::Binary(val) => ByteBuf::from(val.as_slice()),
512            PrimitiveLiteral::Int128(val) => {
513                let PrimitiveType::Decimal { precision, .. } = self.r#type else {
514                    return Err(Error::new(
515                        ErrorKind::DataInvalid,
516                        format!(
517                            "PrimitiveLiteral Int128 must be PrimitiveType Decimal but got {}",
518                            &self.r#type
519                        ),
520                    ));
521                };
522
523                // It's required by iceberg spec that we must keep the minimum
524                // number of bytes for the value
525                let Ok(required_bytes) = Type::decimal_required_bytes(precision) else {
526                    return Err(Error::new(
527                        ErrorKind::DataInvalid,
528                        format!(
529                            "PrimitiveType Decimal must has valid precision but got {}",
530                            precision
531                        ),
532                    ));
533                };
534
535                // The primitive literal is unscaled value.
536                let unscaled_value = BigInt::from(*val);
537                // Convert into two's-complement byte representation of the BigInt
538                // in big-endian byte order.
539                let mut bytes = unscaled_value.to_signed_bytes_be();
540                // Truncate with required bytes to make sure.
541                bytes.truncate(required_bytes as usize);
542
543                ByteBuf::from(bytes)
544            }
545            PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => {
546                return Err(Error::new(
547                    ErrorKind::DataInvalid,
548                    "Cannot convert AboveMax or BelowMin to bytes".to_string(),
549                ));
550            }
551        };
552
553        Ok(buf)
554    }
555
556    /// Creates a boolean value.
557    ///
558    /// Example:
559    /// ```rust
560    /// use iceberg::spec::{Datum, Literal, PrimitiveLiteral};
561    /// let t = Datum::bool(true);
562    ///
563    /// assert_eq!(format!("{}", t), "true".to_string());
564    /// assert_eq!(
565    ///     Literal::from(t),
566    ///     Literal::Primitive(PrimitiveLiteral::Boolean(true))
567    /// );
568    /// ```
569    pub fn bool<T: Into<bool>>(t: T) -> Self {
570        Self {
571            r#type: PrimitiveType::Boolean,
572            literal: PrimitiveLiteral::Boolean(t.into()),
573        }
574    }
575
576    /// Creates a boolean value from string.
577    /// See [Parse bool from str](https://doc.rust-lang.org/stable/std/primitive.bool.html#impl-FromStr-for-bool) for reference.
578    ///
579    /// Example:
580    /// ```rust
581    /// use iceberg::spec::{Datum, Literal, PrimitiveLiteral};
582    /// let t = Datum::bool_from_str("false").unwrap();
583    ///
584    /// assert_eq!(&format!("{}", t), "false");
585    /// assert_eq!(
586    ///     Literal::Primitive(PrimitiveLiteral::Boolean(false)),
587    ///     t.into()
588    /// );
589    /// ```
590    pub fn bool_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
591        let v = s.as_ref().parse::<bool>().map_err(|e| {
592            Error::new(ErrorKind::DataInvalid, "Can't parse string to bool.").with_source(e)
593        })?;
594        Ok(Self::bool(v))
595    }
596
597    /// Creates an 32bit integer.
598    ///
599    /// Example:
600    /// ```rust
601    /// use iceberg::spec::{Datum, Literal, PrimitiveLiteral};
602    /// let t = Datum::int(23i8);
603    ///
604    /// assert_eq!(&format!("{}", t), "23");
605    /// assert_eq!(Literal::Primitive(PrimitiveLiteral::Int(23)), t.into());
606    /// ```
607    pub fn int<T: Into<i32>>(t: T) -> Self {
608        Self {
609            r#type: PrimitiveType::Int,
610            literal: PrimitiveLiteral::Int(t.into()),
611        }
612    }
613
614    /// Creates an 64bit integer.
615    ///
616    /// Example:
617    /// ```rust
618    /// use iceberg::spec::{Datum, Literal, PrimitiveLiteral};
619    /// let t = Datum::long(24i8);
620    ///
621    /// assert_eq!(&format!("{t}"), "24");
622    /// assert_eq!(Literal::Primitive(PrimitiveLiteral::Long(24)), t.into());
623    /// ```
624    pub fn long<T: Into<i64>>(t: T) -> Self {
625        Self {
626            r#type: PrimitiveType::Long,
627            literal: PrimitiveLiteral::Long(t.into()),
628        }
629    }
630
631    /// Creates an 32bit floating point number.
632    ///
633    /// Example:
634    /// ```rust
635    /// use iceberg::spec::{Datum, Literal, PrimitiveLiteral};
636    /// use ordered_float::OrderedFloat;
637    /// let t = Datum::float(32.1f32);
638    ///
639    /// assert_eq!(&format!("{t}"), "32.1");
640    /// assert_eq!(
641    ///     Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(32.1))),
642    ///     t.into()
643    /// );
644    /// ```
645    pub fn float<T: Into<f32>>(t: T) -> Self {
646        Self {
647            r#type: PrimitiveType::Float,
648            literal: PrimitiveLiteral::Float(OrderedFloat(t.into())),
649        }
650    }
651
652    /// Creates an 64bit floating point number.
653    ///
654    /// Example:
655    /// ```rust
656    /// use iceberg::spec::{Datum, Literal, PrimitiveLiteral};
657    /// use ordered_float::OrderedFloat;
658    /// let t = Datum::double(32.1f64);
659    ///
660    /// assert_eq!(&format!("{t}"), "32.1");
661    /// assert_eq!(
662    ///     Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(32.1))),
663    ///     t.into()
664    /// );
665    /// ```
666    pub fn double<T: Into<f64>>(t: T) -> Self {
667        Self {
668            r#type: PrimitiveType::Double,
669            literal: PrimitiveLiteral::Double(OrderedFloat(t.into())),
670        }
671    }
672
673    /// Creates date literal from number of days from unix epoch directly.
674    ///
675    /// Example:
676    /// ```rust
677    /// use iceberg::spec::{Datum, Literal, PrimitiveLiteral};
678    /// // 2 days after 1970-01-01
679    /// let t = Datum::date(2);
680    ///
681    /// assert_eq!(&format!("{t}"), "1970-01-03");
682    /// assert_eq!(Literal::Primitive(PrimitiveLiteral::Int(2)), t.into());
683    /// ```
684    pub fn date(days: i32) -> Self {
685        Self {
686            r#type: PrimitiveType::Date,
687            literal: PrimitiveLiteral::Int(days),
688        }
689    }
690
691    /// Creates date literal in `%Y-%m-%d` format, assume in utc timezone.
692    ///
693    /// See [`NaiveDate::from_str`].
694    ///
695    /// Example
696    /// ```rust
697    /// use iceberg::spec::{Datum, Literal};
698    /// let t = Datum::date_from_str("1970-01-05").unwrap();
699    ///
700    /// assert_eq!(&format!("{t}"), "1970-01-05");
701    /// assert_eq!(Literal::date(4), t.into());
702    /// ```
703    pub fn date_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
704        let t = s.as_ref().parse::<NaiveDate>().map_err(|e| {
705            Error::new(
706                ErrorKind::DataInvalid,
707                format!("Can't parse date from string: {}", s.as_ref()),
708            )
709            .with_source(e)
710        })?;
711
712        Ok(Self::date(date_from_naive_date(t)))
713    }
714
715    /// Create date literal from calendar date (year, month and day).
716    ///
717    /// See [`NaiveDate::from_ymd_opt`].
718    ///
719    /// Example:
720    ///
721    ///```rust
722    /// use iceberg::spec::{Datum, Literal};
723    /// let t = Datum::date_from_ymd(1970, 1, 5).unwrap();
724    ///
725    /// assert_eq!(&format!("{t}"), "1970-01-05");
726    /// assert_eq!(Literal::date(4), t.into());
727    /// ```
728    pub fn date_from_ymd(year: i32, month: u32, day: u32) -> Result<Self> {
729        let t = NaiveDate::from_ymd_opt(year, month, day).ok_or_else(|| {
730            Error::new(
731                ErrorKind::DataInvalid,
732                format!("Can't create date from year: {year}, month: {month}, day: {day}"),
733            )
734        })?;
735
736        Ok(Self::date(date_from_naive_date(t)))
737    }
738
739    /// Creates time literal in microseconds directly.
740    ///
741    /// It will return error when it's negative or too large to fit in 24 hours.
742    ///
743    /// Example:
744    ///
745    /// ```rust
746    /// use iceberg::spec::{Datum, Literal};
747    /// let micro_secs = {
748    ///     1 * 3600 * 1_000_000 + // 1 hour
749    ///     2 * 60 * 1_000_000 +   // 2 minutes
750    ///     1 * 1_000_000 + // 1 second
751    ///     888999 // microseconds
752    /// };
753    ///
754    /// let t = Datum::time_micros(micro_secs).unwrap();
755    ///
756    /// assert_eq!(&format!("{t}"), "01:02:01.888999");
757    /// assert_eq!(Literal::time(micro_secs), t.into());
758    ///
759    /// let negative_value = -100;
760    /// assert!(Datum::time_micros(negative_value).is_err());
761    ///
762    /// let too_large_value = 36 * 60 * 60 * 1_000_000; // Too large to fit in 24 hours.
763    /// assert!(Datum::time_micros(too_large_value).is_err());
764    /// ```
765    pub fn time_micros(value: i64) -> Result<Self> {
766        ensure_data_valid!(
767            (0..=MAX_TIME_VALUE).contains(&value),
768            "Invalid value for Time type: {}",
769            value
770        );
771
772        Ok(Self {
773            r#type: PrimitiveType::Time,
774            literal: PrimitiveLiteral::Long(value),
775        })
776    }
777
778    /// Creates time literal from [`chrono::NaiveTime`].
779    fn time_from_naive_time(t: NaiveTime) -> Self {
780        let duration = t - unix_epoch().time();
781        // It's safe to unwrap here since less than 24 hours will never overflow.
782        let micro_secs = duration.num_microseconds().unwrap();
783
784        Self {
785            r#type: PrimitiveType::Time,
786            literal: PrimitiveLiteral::Long(micro_secs),
787        }
788    }
789
790    /// Creates time literal in microseconds in `%H:%M:%S:.f` format.
791    ///
792    /// See [`NaiveTime::from_str`] for details.
793    ///
794    /// Example:
795    /// ```rust
796    /// use iceberg::spec::{Datum, Literal};
797    /// let t = Datum::time_from_str("01:02:01.888999777").unwrap();
798    ///
799    /// assert_eq!(&format!("{t}"), "01:02:01.888999");
800    /// ```
801    pub fn time_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
802        let t = s.as_ref().parse::<NaiveTime>().map_err(|e| {
803            Error::new(
804                ErrorKind::DataInvalid,
805                format!("Can't parse time from string: {}", s.as_ref()),
806            )
807            .with_source(e)
808        })?;
809
810        Ok(Self::time_from_naive_time(t))
811    }
812
813    /// Creates time literal from hour, minute, second, and microseconds.
814    ///
815    /// See [`NaiveTime::from_hms_micro_opt`].
816    ///
817    /// Example:
818    /// ```rust
819    /// use iceberg::spec::{Datum, Literal};
820    /// let t = Datum::time_from_hms_micro(22, 15, 33, 111).unwrap();
821    ///
822    /// assert_eq!(&format!("{t}"), "22:15:33.000111");
823    /// ```
824    pub fn time_from_hms_micro(hour: u32, min: u32, sec: u32, micro: u32) -> Result<Self> {
825        let t = NaiveTime::from_hms_micro_opt(hour, min, sec, micro)
826            .ok_or_else(|| Error::new(
827                ErrorKind::DataInvalid,
828                format!("Can't create time from hour: {hour}, min: {min}, second: {sec}, microsecond: {micro}"),
829            ))?;
830        Ok(Self::time_from_naive_time(t))
831    }
832
833    /// Creates a timestamp from unix epoch in microseconds.
834    ///
835    /// Example:
836    ///
837    /// ```rust
838    /// use iceberg::spec::Datum;
839    /// let t = Datum::timestamp_micros(1000);
840    ///
841    /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.001");
842    /// ```
843    pub fn timestamp_micros(value: i64) -> Self {
844        Self {
845            r#type: PrimitiveType::Timestamp,
846            literal: PrimitiveLiteral::Long(value),
847        }
848    }
849
850    /// Creates a timestamp from unix epoch in nanoseconds.
851    ///
852    /// Example:
853    ///
854    /// ```rust
855    /// use iceberg::spec::Datum;
856    /// let t = Datum::timestamp_nanos(1000);
857    ///
858    /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.000001");
859    /// ```
860    pub fn timestamp_nanos(value: i64) -> Self {
861        Self {
862            r#type: PrimitiveType::TimestampNs,
863            literal: PrimitiveLiteral::Long(value),
864        }
865    }
866
867    /// Creates a timestamp from [`DateTime`].
868    ///
869    /// Example:
870    ///
871    /// ```rust
872    /// use chrono::{NaiveDate, NaiveDateTime, TimeZone, Utc};
873    /// use iceberg::spec::Datum;
874    /// let t = Datum::timestamp_from_datetime(
875    ///     NaiveDate::from_ymd_opt(1992, 3, 1)
876    ///         .unwrap()
877    ///         .and_hms_micro_opt(1, 2, 3, 88)
878    ///         .unwrap(),
879    /// );
880    ///
881    /// assert_eq!(&format!("{t}"), "1992-03-01 01:02:03.000088");
882    /// ```
883    pub fn timestamp_from_datetime(dt: NaiveDateTime) -> Self {
884        Self::timestamp_micros(dt.and_utc().timestamp_micros())
885    }
886
887    /// Parse a timestamp in [`%Y-%m-%dT%H:%M:%S%.f`] format.
888    ///
889    /// See [`NaiveDateTime::from_str`].
890    ///
891    /// Example:
892    ///
893    /// ```rust
894    /// use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime};
895    /// use iceberg::spec::{Datum, Literal};
896    /// let t = Datum::timestamp_from_str("1992-03-01T01:02:03.000088").unwrap();
897    ///
898    /// assert_eq!(&format!("{t}"), "1992-03-01 01:02:03.000088");
899    /// ```
900    pub fn timestamp_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
901        let dt = s.as_ref().parse::<NaiveDateTime>().map_err(|e| {
902            Error::new(ErrorKind::DataInvalid, "Can't parse timestamp.").with_source(e)
903        })?;
904
905        Ok(Self::timestamp_from_datetime(dt))
906    }
907
908    /// Creates a timestamp with timezone from unix epoch in microseconds.
909    ///
910    /// Example:
911    ///
912    /// ```rust
913    /// use iceberg::spec::Datum;
914    /// let t = Datum::timestamptz_micros(1000);
915    ///
916    /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.001 UTC");
917    /// ```
918    pub fn timestamptz_micros(value: i64) -> Self {
919        Self {
920            r#type: PrimitiveType::Timestamptz,
921            literal: PrimitiveLiteral::Long(value),
922        }
923    }
924
925    /// Creates a timestamp with timezone from unix epoch in nanoseconds.
926    ///
927    /// Example:
928    ///
929    /// ```rust
930    /// use iceberg::spec::Datum;
931    /// let t = Datum::timestamptz_nanos(1000);
932    ///
933    /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.000001 UTC");
934    /// ```
935    pub fn timestamptz_nanos(value: i64) -> Self {
936        Self {
937            r#type: PrimitiveType::TimestamptzNs,
938            literal: PrimitiveLiteral::Long(value),
939        }
940    }
941
942    /// Creates a timestamp with timezone from [`DateTime`].
943    /// Example:
944    ///
945    /// ```rust
946    /// use chrono::{TimeZone, Utc};
947    /// use iceberg::spec::Datum;
948    /// let t = Datum::timestamptz_from_datetime(Utc.timestamp_opt(1000, 0).unwrap());
949    ///
950    /// assert_eq!(&format!("{t}"), "1970-01-01 00:16:40 UTC");
951    /// ```
952    pub fn timestamptz_from_datetime<T: TimeZone>(dt: DateTime<T>) -> Self {
953        Self::timestamptz_micros(dt.with_timezone(&Utc).timestamp_micros())
954    }
955
956    /// Parse timestamp with timezone in RFC3339 format.
957    ///
958    /// See [`DateTime::from_str`].
959    ///
960    /// Example:
961    ///
962    /// ```rust
963    /// use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime};
964    /// use iceberg::spec::{Datum, Literal};
965    /// let t = Datum::timestamptz_from_str("1992-03-01T01:02:03.000088+08:00").unwrap();
966    ///
967    /// assert_eq!(&format!("{t}"), "1992-02-29 17:02:03.000088 UTC");
968    /// ```
969    pub fn timestamptz_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
970        let dt = DateTime::<Utc>::from_str(s.as_ref()).map_err(|e| {
971            Error::new(ErrorKind::DataInvalid, "Can't parse datetime.").with_source(e)
972        })?;
973
974        Ok(Self::timestamptz_from_datetime(dt))
975    }
976
977    /// Creates a string literal.
978    ///
979    /// Example:
980    ///
981    /// ```rust
982    /// use iceberg::spec::Datum;
983    /// let t = Datum::string("ss");
984    ///
985    /// assert_eq!(&format!("{t}"), r#""ss""#);
986    /// ```
987    pub fn string<S: ToString>(s: S) -> Self {
988        Self {
989            r#type: PrimitiveType::String,
990            literal: PrimitiveLiteral::String(s.to_string()),
991        }
992    }
993
994    /// Creates uuid literal.
995    ///
996    /// Example:
997    ///
998    /// ```rust
999    /// use iceberg::spec::Datum;
1000    /// use uuid::uuid;
1001    /// let t = Datum::uuid(uuid!("a1a2a3a4-b1b2-c1c2-d1d2-d3d4d5d6d7d8"));
1002    ///
1003    /// assert_eq!(&format!("{t}"), "a1a2a3a4-b1b2-c1c2-d1d2-d3d4d5d6d7d8");
1004    /// ```
1005    pub fn uuid(uuid: Uuid) -> Self {
1006        Self {
1007            r#type: PrimitiveType::Uuid,
1008            literal: PrimitiveLiteral::UInt128(uuid.as_u128()),
1009        }
1010    }
1011
1012    /// Creates uuid from str. See [`Uuid::parse_str`].
1013    ///
1014    /// Example:
1015    ///
1016    /// ```rust
1017    /// use iceberg::spec::Datum;
1018    /// let t = Datum::uuid_from_str("a1a2a3a4-b1b2-c1c2-d1d2-d3d4d5d6d7d8").unwrap();
1019    ///
1020    /// assert_eq!(&format!("{t}"), "a1a2a3a4-b1b2-c1c2-d1d2-d3d4d5d6d7d8");
1021    /// ```
1022    pub fn uuid_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1023        let uuid = Uuid::parse_str(s.as_ref()).map_err(|e| {
1024            Error::new(
1025                ErrorKind::DataInvalid,
1026                format!("Can't parse uuid from string: {}", s.as_ref()),
1027            )
1028            .with_source(e)
1029        })?;
1030        Ok(Self::uuid(uuid))
1031    }
1032
1033    /// Creates a fixed literal from bytes.
1034    ///
1035    /// Example:
1036    ///
1037    /// ```rust
1038    /// use iceberg::spec::{Datum, Literal, PrimitiveLiteral};
1039    /// let t = Datum::fixed(vec![1u8, 2u8]);
1040    ///
1041    /// assert_eq!(&format!("{t}"), "0102");
1042    /// ```
1043    pub fn fixed<I: IntoIterator<Item = u8>>(input: I) -> Self {
1044        let value: Vec<u8> = input.into_iter().collect();
1045        Self {
1046            r#type: PrimitiveType::Fixed(value.len() as u64),
1047            literal: PrimitiveLiteral::Binary(value),
1048        }
1049    }
1050
1051    /// Creates a binary literal from bytes.
1052    ///
1053    /// Example:
1054    ///
1055    /// ```rust
1056    /// use iceberg::spec::Datum;
1057    /// let t = Datum::binary(vec![1u8, 100u8]);
1058    ///
1059    /// assert_eq!(&format!("{t}"), "0164");
1060    /// ```
1061    pub fn binary<I: IntoIterator<Item = u8>>(input: I) -> Self {
1062        Self {
1063            r#type: PrimitiveType::Binary,
1064            literal: PrimitiveLiteral::Binary(input.into_iter().collect()),
1065        }
1066    }
1067
1068    /// Creates decimal literal from string. See [`Decimal::from_str_exact`].
1069    ///
1070    /// Example:
1071    ///
1072    /// ```rust
1073    /// use iceberg::spec::Datum;
1074    /// use itertools::assert_equal;
1075    /// use rust_decimal::Decimal;
1076    /// let t = Datum::decimal_from_str("123.45").unwrap();
1077    ///
1078    /// assert_eq!(&format!("{t}"), "123.45");
1079    /// ```
1080    pub fn decimal_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1081        let decimal = Decimal::from_str_exact(s.as_ref()).map_err(|e| {
1082            Error::new(ErrorKind::DataInvalid, "Can't parse decimal.").with_source(e)
1083        })?;
1084
1085        Self::decimal(decimal)
1086    }
1087
1088    /// Try to create a decimal literal from [`Decimal`].
1089    ///
1090    /// Example:
1091    ///
1092    /// ```rust
1093    /// use iceberg::spec::Datum;
1094    /// use rust_decimal::Decimal;
1095    ///
1096    /// let t = Datum::decimal(Decimal::new(123, 2)).unwrap();
1097    ///
1098    /// assert_eq!(&format!("{t}"), "1.23");
1099    /// ```
1100    pub fn decimal(value: impl Into<Decimal>) -> Result<Self> {
1101        let decimal = value.into();
1102        let scale = decimal.scale();
1103
1104        let r#type = Type::decimal(MAX_DECIMAL_PRECISION, scale)?;
1105        if let Type::Primitive(p) = r#type {
1106            Ok(Self {
1107                r#type: p,
1108                literal: PrimitiveLiteral::Int128(decimal.mantissa()),
1109            })
1110        } else {
1111            unreachable!("Decimal type must be primitive.")
1112        }
1113    }
1114
1115    /// Try to create a decimal literal from [`Decimal`] with precision.
1116    ///
1117    /// Example:
1118    ///
1119    /// ```rust
1120    /// use iceberg::spec::Datum;
1121    /// use rust_decimal::Decimal;
1122    ///
1123    /// let t = Datum::decimal_with_precision(Decimal::new(123, 2), 30).unwrap();
1124    ///
1125    /// assert_eq!(&format!("{t}"), "1.23");
1126    /// ```
1127    pub fn decimal_with_precision(value: impl Into<Decimal>, precision: u32) -> Result<Self> {
1128        let decimal = value.into();
1129        let scale = decimal.scale();
1130
1131        let available_bytes = Type::decimal_required_bytes(precision)? as usize;
1132        let unscaled_value = BigInt::from(decimal.mantissa());
1133        let actual_bytes = unscaled_value.to_signed_bytes_be();
1134        if actual_bytes.len() > available_bytes {
1135            return Err(Error::new(
1136                ErrorKind::DataInvalid,
1137                format!(
1138                    "Decimal value {} is too large for precision {}",
1139                    decimal, precision
1140                ),
1141            ));
1142        }
1143
1144        let r#type = Type::decimal(precision, scale)?;
1145        if let Type::Primitive(p) = r#type {
1146            Ok(Self {
1147                r#type: p,
1148                literal: PrimitiveLiteral::Int128(decimal.mantissa()),
1149            })
1150        } else {
1151            unreachable!("Decimal type must be primitive.")
1152        }
1153    }
1154
1155    fn i64_to_i32<T: Into<i64> + PartialOrd<i64>>(val: T) -> Datum {
1156        if val > INT_MAX as i64 {
1157            Datum::new(PrimitiveType::Int, PrimitiveLiteral::AboveMax)
1158        } else if val < INT_MIN as i64 {
1159            Datum::new(PrimitiveType::Int, PrimitiveLiteral::BelowMin)
1160        } else {
1161            Datum::int(val.into() as i32)
1162        }
1163    }
1164
1165    fn i128_to_i32<T: Into<i128> + PartialOrd<i128>>(val: T) -> Datum {
1166        if val > INT_MAX as i128 {
1167            Datum::new(PrimitiveType::Int, PrimitiveLiteral::AboveMax)
1168        } else if val < INT_MIN as i128 {
1169            Datum::new(PrimitiveType::Int, PrimitiveLiteral::BelowMin)
1170        } else {
1171            Datum::int(val.into() as i32)
1172        }
1173    }
1174
1175    fn i128_to_i64<T: Into<i128> + PartialOrd<i128>>(val: T) -> Datum {
1176        if val > LONG_MAX as i128 {
1177            Datum::new(PrimitiveType::Long, PrimitiveLiteral::AboveMax)
1178        } else if val < LONG_MIN as i128 {
1179            Datum::new(PrimitiveType::Long, PrimitiveLiteral::BelowMin)
1180        } else {
1181            Datum::long(val.into() as i64)
1182        }
1183    }
1184
1185    fn string_to_i128<S: AsRef<str>>(s: S) -> Result<i128> {
1186        s.as_ref().parse::<i128>().map_err(|e| {
1187            Error::new(ErrorKind::DataInvalid, "Can't parse string to i128.").with_source(e)
1188        })
1189    }
1190
1191    /// Convert the datum to `target_type`.
1192    pub fn to(self, target_type: &Type) -> Result<Datum> {
1193        match target_type {
1194            Type::Primitive(target_primitive_type) => {
1195                match (&self.literal, &self.r#type, target_primitive_type) {
1196                    (PrimitiveLiteral::Int(val), _, PrimitiveType::Int) => Ok(Datum::int(*val)),
1197                    (PrimitiveLiteral::Int(val), _, PrimitiveType::Date) => Ok(Datum::date(*val)),
1198                    (PrimitiveLiteral::Int(val), _, PrimitiveType::Long) => Ok(Datum::long(*val)),
1199                    (PrimitiveLiteral::Long(val), _, PrimitiveType::Int) => {
1200                        Ok(Datum::i64_to_i32(*val))
1201                    }
1202                    (PrimitiveLiteral::Long(val), _, PrimitiveType::Timestamp) => {
1203                        Ok(Datum::timestamp_micros(*val))
1204                    }
1205                    (PrimitiveLiteral::Long(val), _, PrimitiveType::Timestamptz) => {
1206                        Ok(Datum::timestamptz_micros(*val))
1207                    }
1208                    // Let's wait with nano's until this clears up: https://github.com/apache/iceberg/pull/11775
1209                    (PrimitiveLiteral::Int128(val), _, PrimitiveType::Long) => {
1210                        Ok(Datum::i128_to_i64(*val))
1211                    }
1212
1213                    (PrimitiveLiteral::String(val), _, PrimitiveType::Boolean) => {
1214                        Datum::bool_from_str(val)
1215                    }
1216                    (PrimitiveLiteral::String(val), _, PrimitiveType::Int) => {
1217                        Datum::string_to_i128(val).map(Datum::i128_to_i32)
1218                    }
1219                    (PrimitiveLiteral::String(val), _, PrimitiveType::Long) => {
1220                        Datum::string_to_i128(val).map(Datum::i128_to_i64)
1221                    }
1222                    (PrimitiveLiteral::String(val), _, PrimitiveType::Timestamp) => {
1223                        Datum::timestamp_from_str(val)
1224                    }
1225                    (PrimitiveLiteral::String(val), _, PrimitiveType::Timestamptz) => {
1226                        Datum::timestamptz_from_str(val)
1227                    }
1228
1229                    // TODO: implement more type conversions
1230                    (_, self_type, target_type) if self_type == target_type => Ok(self),
1231                    _ => Err(Error::new(
1232                        ErrorKind::DataInvalid,
1233                        format!(
1234                            "Can't convert datum from {} type to {} type.",
1235                            self.r#type, target_primitive_type
1236                        ),
1237                    )),
1238                }
1239            }
1240            _ => Err(Error::new(
1241                ErrorKind::DataInvalid,
1242                format!(
1243                    "Can't convert datum from {} type to {} type.",
1244                    self.r#type, target_type
1245                ),
1246            )),
1247        }
1248    }
1249
1250    /// Get the primitive literal from datum.
1251    pub fn literal(&self) -> &PrimitiveLiteral {
1252        &self.literal
1253    }
1254
1255    /// Get the primitive type from datum.
1256    pub fn data_type(&self) -> &PrimitiveType {
1257        &self.r#type
1258    }
1259
1260    /// Returns true if the Literal represents a primitive type
1261    /// that can be a NaN, and that it's value is NaN
1262    pub fn is_nan(&self) -> bool {
1263        match self.literal {
1264            PrimitiveLiteral::Double(val) => val.is_nan(),
1265            PrimitiveLiteral::Float(val) => val.is_nan(),
1266            _ => false,
1267        }
1268    }
1269
1270    /// Returns a human-readable string representation of this literal.
1271    ///
1272    /// For string literals, this returns the raw string value without quotes.
1273    /// For all other literals, it falls back to [`to_string()`].
1274    pub(crate) fn to_human_string(&self) -> String {
1275        match self.literal() {
1276            PrimitiveLiteral::String(s) => s.to_string(),
1277            _ => self.to_string(),
1278        }
1279    }
1280}
1281
1282/// Map is a collection of key-value pairs with a key type and a value type.
1283/// It used in Literal::Map, to make it hashable, the order of key-value pairs is stored in a separate vector
1284/// so that we can hash the map in a deterministic way. But it also means that the order of key-value pairs is matter
1285/// for the hash value.
1286///
1287/// When converting to Arrow (e.g., for Iceberg), the map should be represented using
1288/// the default field name "key_value".
1289///
1290/// Example:
1291///
1292/// ```text
1293/// let key_value_field = Field::new(
1294///     DEFAULT_MAP_FIELD_NAME,
1295///     arrow_schema::DataType::Struct(vec![
1296///         Arc::new(key_field.clone()),
1297///         Arc::new(value_field.clone()),
1298///     ].into()),
1299///     false
1300/// );
1301/// '''
1302#[derive(Clone, Debug, PartialEq, Eq)]
1303pub struct Map {
1304    index: HashMap<Literal, usize>,
1305    pair: Vec<(Literal, Option<Literal>)>,
1306}
1307
1308impl Map {
1309    /// Creates a new empty map.
1310    pub fn new() -> Self {
1311        Self {
1312            index: HashMap::new(),
1313            pair: Vec::new(),
1314        }
1315    }
1316
1317    /// Return the number of key-value pairs in the map.
1318    pub fn len(&self) -> usize {
1319        self.pair.len()
1320    }
1321
1322    /// Returns true if the map contains no elements.
1323    pub fn is_empty(&self) -> bool {
1324        self.pair.is_empty()
1325    }
1326
1327    /// Inserts a key-value pair into the map.
1328    /// If the map did not have this key present, None is returned.
1329    /// If the map did have this key present, the value is updated, and the old value is returned.
1330    pub fn insert(&mut self, key: Literal, value: Option<Literal>) -> Option<Option<Literal>> {
1331        if let Some(index) = self.index.get(&key) {
1332            let old_value = std::mem::replace(&mut self.pair[*index].1, value);
1333            Some(old_value)
1334        } else {
1335            self.pair.push((key.clone(), value));
1336            self.index.insert(key, self.pair.len() - 1);
1337            None
1338        }
1339    }
1340
1341    /// Returns a reference to the value corresponding to the key.
1342    /// If the key is not present in the map, None is returned.
1343    pub fn get(&self, key: &Literal) -> Option<&Option<Literal>> {
1344        self.index.get(key).map(|index| &self.pair[*index].1)
1345    }
1346
1347    /// The order of map is matter, so this method used to compare two maps has same key-value pairs without considering the order.
1348    pub fn has_same_content(&self, other: &Map) -> bool {
1349        if self.len() != other.len() {
1350            return false;
1351        }
1352
1353        for (key, value) in &self.pair {
1354            match other.get(key) {
1355                Some(other_value) if value == other_value => (),
1356                _ => return false,
1357            }
1358        }
1359
1360        true
1361    }
1362}
1363
1364impl Default for Map {
1365    fn default() -> Self {
1366        Self::new()
1367    }
1368}
1369
1370impl Hash for Map {
1371    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1372        for (key, value) in &self.pair {
1373            key.hash(state);
1374            value.hash(state);
1375        }
1376    }
1377}
1378
1379impl FromIterator<(Literal, Option<Literal>)> for Map {
1380    fn from_iter<T: IntoIterator<Item = (Literal, Option<Literal>)>>(iter: T) -> Self {
1381        let mut map = Map::new();
1382        for (key, value) in iter {
1383            map.insert(key, value);
1384        }
1385        map
1386    }
1387}
1388
1389impl IntoIterator for Map {
1390    type Item = (Literal, Option<Literal>);
1391    type IntoIter = std::vec::IntoIter<Self::Item>;
1392
1393    fn into_iter(self) -> Self::IntoIter {
1394        self.pair.into_iter()
1395    }
1396}
1397
1398impl<const N: usize> From<[(Literal, Option<Literal>); N]> for Map {
1399    fn from(value: [(Literal, Option<Literal>); N]) -> Self {
1400        value.iter().cloned().collect()
1401    }
1402}
1403
1404/// Values present in iceberg type
1405#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1406pub enum Literal {
1407    /// A primitive value
1408    Primitive(PrimitiveLiteral),
1409    /// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema.
1410    /// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type.
1411    /// Fields may have an optional comment or doc string. Fields can have default values.
1412    Struct(Struct),
1413    /// A list is a collection of values with some element type.
1414    /// The element field has an integer id that is unique in the table schema.
1415    /// Elements can be either optional or required. Element types may be any type.
1416    List(Vec<Option<Literal>>),
1417    /// A map is a collection of key-value pairs with a key type and a value type.
1418    /// Both the key field and value field each have an integer id that is unique in the table schema.
1419    /// Map keys are required and map values can be either optional or required. Both map keys and map values may be any type, including nested types.
1420    Map(Map),
1421}
1422
1423impl Literal {
1424    /// Creates a boolean value.
1425    ///
1426    /// Example:
1427    /// ```rust
1428    /// use iceberg::spec::{Literal, PrimitiveLiteral};
1429    /// let t = Literal::bool(true);
1430    ///
1431    /// assert_eq!(Literal::Primitive(PrimitiveLiteral::Boolean(true)), t);
1432    /// ```
1433    pub fn bool<T: Into<bool>>(t: T) -> Self {
1434        Self::Primitive(PrimitiveLiteral::Boolean(t.into()))
1435    }
1436
1437    /// Creates a boolean value from string.
1438    /// See [Parse bool from str](https://doc.rust-lang.org/stable/std/primitive.bool.html#impl-FromStr-for-bool) for reference.
1439    ///
1440    /// Example:
1441    /// ```rust
1442    /// use iceberg::spec::{Literal, PrimitiveLiteral};
1443    /// let t = Literal::bool_from_str("false").unwrap();
1444    ///
1445    /// assert_eq!(Literal::Primitive(PrimitiveLiteral::Boolean(false)), t);
1446    /// ```
1447    pub fn bool_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1448        let v = s.as_ref().parse::<bool>().map_err(|e| {
1449            Error::new(ErrorKind::DataInvalid, "Can't parse string to bool.").with_source(e)
1450        })?;
1451        Ok(Self::Primitive(PrimitiveLiteral::Boolean(v)))
1452    }
1453
1454    /// Creates an 32bit integer.
1455    ///
1456    /// Example:
1457    /// ```rust
1458    /// use iceberg::spec::{Literal, PrimitiveLiteral};
1459    /// let t = Literal::int(23i8);
1460    ///
1461    /// assert_eq!(Literal::Primitive(PrimitiveLiteral::Int(23)), t);
1462    /// ```
1463    pub fn int<T: Into<i32>>(t: T) -> Self {
1464        Self::Primitive(PrimitiveLiteral::Int(t.into()))
1465    }
1466
1467    /// Creates an 64bit integer.
1468    ///
1469    /// Example:
1470    /// ```rust
1471    /// use iceberg::spec::{Literal, PrimitiveLiteral};
1472    /// let t = Literal::long(24i8);
1473    ///
1474    /// assert_eq!(Literal::Primitive(PrimitiveLiteral::Long(24)), t);
1475    /// ```
1476    pub fn long<T: Into<i64>>(t: T) -> Self {
1477        Self::Primitive(PrimitiveLiteral::Long(t.into()))
1478    }
1479
1480    /// Creates an 32bit floating point number.
1481    ///
1482    /// Example:
1483    /// ```rust
1484    /// use iceberg::spec::{Literal, PrimitiveLiteral};
1485    /// use ordered_float::OrderedFloat;
1486    /// let t = Literal::float(32.1f32);
1487    ///
1488    /// assert_eq!(
1489    ///     Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(32.1))),
1490    ///     t
1491    /// );
1492    /// ```
1493    pub fn float<T: Into<f32>>(t: T) -> Self {
1494        Self::Primitive(PrimitiveLiteral::Float(OrderedFloat(t.into())))
1495    }
1496
1497    /// Creates an 32bit floating point number.
1498    ///
1499    /// Example:
1500    /// ```rust
1501    /// use iceberg::spec::{Literal, PrimitiveLiteral};
1502    /// use ordered_float::OrderedFloat;
1503    /// let t = Literal::double(32.1f64);
1504    ///
1505    /// assert_eq!(
1506    ///     Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(32.1))),
1507    ///     t
1508    /// );
1509    /// ```
1510    pub fn double<T: Into<f64>>(t: T) -> Self {
1511        Self::Primitive(PrimitiveLiteral::Double(OrderedFloat(t.into())))
1512    }
1513
1514    /// Creates date literal from number of days from unix epoch directly.
1515    pub fn date(days: i32) -> Self {
1516        Self::Primitive(PrimitiveLiteral::Int(days))
1517    }
1518
1519    /// Creates a date in `%Y-%m-%d` format, assume in utc timezone.
1520    ///
1521    /// See [`NaiveDate::from_str`].
1522    ///
1523    /// Example
1524    /// ```rust
1525    /// use iceberg::spec::Literal;
1526    /// let t = Literal::date_from_str("1970-01-03").unwrap();
1527    ///
1528    /// assert_eq!(Literal::date(2), t);
1529    /// ```
1530    pub fn date_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1531        let t = s.as_ref().parse::<NaiveDate>().map_err(|e| {
1532            Error::new(
1533                ErrorKind::DataInvalid,
1534                format!("Can't parse date from string: {}", s.as_ref()),
1535            )
1536            .with_source(e)
1537        })?;
1538
1539        Ok(Self::date(date_from_naive_date(t)))
1540    }
1541
1542    /// Create a date from calendar date (year, month and day).
1543    ///
1544    /// See [`NaiveDate::from_ymd_opt`].
1545    ///
1546    /// Example:
1547    ///
1548    ///```rust
1549    /// use iceberg::spec::Literal;
1550    /// let t = Literal::date_from_ymd(1970, 1, 5).unwrap();
1551    ///
1552    /// assert_eq!(Literal::date(4), t);
1553    /// ```
1554    pub fn date_from_ymd(year: i32, month: u32, day: u32) -> Result<Self> {
1555        let t = NaiveDate::from_ymd_opt(year, month, day).ok_or_else(|| {
1556            Error::new(
1557                ErrorKind::DataInvalid,
1558                format!("Can't create date from year: {year}, month: {month}, day: {day}"),
1559            )
1560        })?;
1561
1562        Ok(Self::date(date_from_naive_date(t)))
1563    }
1564
1565    /// Creates time in microseconds directly
1566    pub fn time(value: i64) -> Self {
1567        Self::Primitive(PrimitiveLiteral::Long(value))
1568    }
1569
1570    /// Creates time literal from [`chrono::NaiveTime`].
1571    fn time_from_naive_time(t: NaiveTime) -> Self {
1572        let duration = t - unix_epoch().time();
1573        // It's safe to unwrap here since less than 24 hours will never overflow.
1574        let micro_secs = duration.num_microseconds().unwrap();
1575
1576        Literal::time(micro_secs)
1577    }
1578
1579    /// Creates time in microseconds in `%H:%M:%S:.f` format.
1580    ///
1581    /// See [`NaiveTime::from_str`] for details.
1582    ///
1583    /// Example:
1584    /// ```rust
1585    /// use iceberg::spec::Literal;
1586    /// let t = Literal::time_from_str("01:02:01.888999777").unwrap();
1587    ///
1588    /// let micro_secs = {
1589    ///     1 * 3600 * 1_000_000 + // 1 hour
1590    ///     2 * 60 * 1_000_000 +   // 2 minutes
1591    ///     1 * 1_000_000 + // 1 second
1592    ///     888999 // microseconds
1593    /// };
1594    /// assert_eq!(Literal::time(micro_secs), t);
1595    /// ```
1596    pub fn time_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1597        let t = s.as_ref().parse::<NaiveTime>().map_err(|e| {
1598            Error::new(
1599                ErrorKind::DataInvalid,
1600                format!("Can't parse time from string: {}", s.as_ref()),
1601            )
1602            .with_source(e)
1603        })?;
1604
1605        Ok(Self::time_from_naive_time(t))
1606    }
1607
1608    /// Creates time literal from hour, minute, second, and microseconds.
1609    ///
1610    /// See [`NaiveTime::from_hms_micro_opt`].
1611    ///
1612    /// Example:
1613    /// ```rust
1614    /// use iceberg::spec::Literal;
1615    /// let t = Literal::time_from_hms_micro(22, 15, 33, 111).unwrap();
1616    ///
1617    /// assert_eq!(Literal::time_from_str("22:15:33.000111").unwrap(), t);
1618    /// ```
1619    pub fn time_from_hms_micro(hour: u32, min: u32, sec: u32, micro: u32) -> Result<Self> {
1620        let t = NaiveTime::from_hms_micro_opt(hour, min, sec, micro)
1621            .ok_or_else(|| Error::new(
1622                ErrorKind::DataInvalid,
1623                format!("Can't create time from hour: {hour}, min: {min}, second: {sec}, microsecond: {micro}"),
1624            ))?;
1625        Ok(Self::time_from_naive_time(t))
1626    }
1627
1628    /// Creates a timestamp from unix epoch in microseconds.
1629    pub fn timestamp(value: i64) -> Self {
1630        Self::Primitive(PrimitiveLiteral::Long(value))
1631    }
1632
1633    /// Creates a timestamp with timezone from unix epoch in microseconds.
1634    pub fn timestamptz(value: i64) -> Self {
1635        Self::Primitive(PrimitiveLiteral::Long(value))
1636    }
1637
1638    /// Creates a timestamp from unix epoch in nanoseconds.
1639    pub(crate) fn timestamp_nano(value: i64) -> Self {
1640        Self::Primitive(PrimitiveLiteral::Long(value))
1641    }
1642
1643    /// Creates a timestamp with timezone from unix epoch in nanoseconds.
1644    pub(crate) fn timestamptz_nano(value: i64) -> Self {
1645        Self::Primitive(PrimitiveLiteral::Long(value))
1646    }
1647
1648    /// Creates a timestamp from [`DateTime`].
1649    pub fn timestamp_from_datetime<T: TimeZone>(dt: DateTime<T>) -> Self {
1650        Self::timestamp(dt.with_timezone(&Utc).timestamp_micros())
1651    }
1652
1653    /// Creates a timestamp with timezone from [`DateTime`].
1654    pub fn timestamptz_from_datetime<T: TimeZone>(dt: DateTime<T>) -> Self {
1655        Self::timestamptz(dt.with_timezone(&Utc).timestamp_micros())
1656    }
1657
1658    /// Parse a timestamp in RFC3339 format.
1659    ///
1660    /// See [`DateTime<Utc>::from_str`].
1661    ///
1662    /// Example:
1663    ///
1664    /// ```rust
1665    /// use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime};
1666    /// use iceberg::spec::Literal;
1667    /// let t = Literal::timestamp_from_str("2012-12-12 12:12:12.8899-04:00").unwrap();
1668    ///
1669    /// let t2 = {
1670    ///     let date = NaiveDate::from_ymd_opt(2012, 12, 12).unwrap();
1671    ///     let time = NaiveTime::from_hms_micro_opt(12, 12, 12, 889900).unwrap();
1672    ///     let dt = NaiveDateTime::new(date, time);
1673    ///     Literal::timestamp_from_datetime(DateTime::<FixedOffset>::from_local(
1674    ///         dt,
1675    ///         FixedOffset::west_opt(4 * 3600).unwrap(),
1676    ///     ))
1677    /// };
1678    ///
1679    /// assert_eq!(t, t2);
1680    /// ```
1681    pub fn timestamp_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1682        let dt = DateTime::<Utc>::from_str(s.as_ref()).map_err(|e| {
1683            Error::new(ErrorKind::DataInvalid, "Can't parse datetime.").with_source(e)
1684        })?;
1685
1686        Ok(Self::timestamp_from_datetime(dt))
1687    }
1688
1689    /// Similar to [`Literal::timestamp_from_str`], but return timestamp with timezone literal.
1690    pub fn timestamptz_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1691        let dt = DateTime::<Utc>::from_str(s.as_ref()).map_err(|e| {
1692            Error::new(ErrorKind::DataInvalid, "Can't parse datetime.").with_source(e)
1693        })?;
1694
1695        Ok(Self::timestamptz_from_datetime(dt))
1696    }
1697
1698    /// Creates a string literal.
1699    pub fn string<S: ToString>(s: S) -> Self {
1700        Self::Primitive(PrimitiveLiteral::String(s.to_string()))
1701    }
1702
1703    /// Creates uuid literal.
1704    pub fn uuid(uuid: Uuid) -> Self {
1705        Self::Primitive(PrimitiveLiteral::UInt128(uuid.as_u128()))
1706    }
1707
1708    /// Creates uuid from str. See [`Uuid::parse_str`].
1709    ///
1710    /// Example:
1711    ///
1712    /// ```rust
1713    /// use iceberg::spec::Literal;
1714    /// use uuid::Uuid;
1715    /// let t1 = Literal::uuid_from_str("a1a2a3a4-b1b2-c1c2-d1d2-d3d4d5d6d7d8").unwrap();
1716    /// let t2 = Literal::uuid(Uuid::from_u128_le(0xd8d7d6d5d4d3d2d1c2c1b2b1a4a3a2a1));
1717    ///
1718    /// assert_eq!(t1, t2);
1719    /// ```
1720    pub fn uuid_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1721        let uuid = Uuid::parse_str(s.as_ref()).map_err(|e| {
1722            Error::new(
1723                ErrorKind::DataInvalid,
1724                format!("Can't parse uuid from string: {}", s.as_ref()),
1725            )
1726            .with_source(e)
1727        })?;
1728        Ok(Self::uuid(uuid))
1729    }
1730
1731    /// Creates a fixed literal from bytes.
1732    ///
1733    /// Example:
1734    ///
1735    /// ```rust
1736    /// use iceberg::spec::{Literal, PrimitiveLiteral};
1737    /// let t1 = Literal::fixed(vec![1u8, 2u8]);
1738    /// let t2 = Literal::Primitive(PrimitiveLiteral::Binary(vec![1u8, 2u8]));
1739    ///
1740    /// assert_eq!(t1, t2);
1741    /// ```
1742    pub fn fixed<I: IntoIterator<Item = u8>>(input: I) -> Self {
1743        Literal::Primitive(PrimitiveLiteral::Binary(input.into_iter().collect()))
1744    }
1745
1746    /// Creates a binary literal from bytes.
1747    ///
1748    /// Example:
1749    ///
1750    /// ```rust
1751    /// use iceberg::spec::{Literal, PrimitiveLiteral};
1752    /// let t1 = Literal::binary(vec![1u8, 2u8]);
1753    /// let t2 = Literal::Primitive(PrimitiveLiteral::Binary(vec![1u8, 2u8]));
1754    ///
1755    /// assert_eq!(t1, t2);
1756    /// ```
1757    pub fn binary<I: IntoIterator<Item = u8>>(input: I) -> Self {
1758        Literal::Primitive(PrimitiveLiteral::Binary(input.into_iter().collect()))
1759    }
1760
1761    /// Creates a decimal literal.
1762    pub fn decimal(decimal: i128) -> Self {
1763        Self::Primitive(PrimitiveLiteral::Int128(decimal))
1764    }
1765
1766    /// Creates decimal literal from string. See [`Decimal::from_str_exact`].
1767    ///
1768    /// Example:
1769    ///
1770    /// ```rust
1771    /// use iceberg::spec::Literal;
1772    /// use rust_decimal::Decimal;
1773    /// let t1 = Literal::decimal(12345);
1774    /// let t2 = Literal::decimal_from_str("123.45").unwrap();
1775    ///
1776    /// assert_eq!(t1, t2);
1777    /// ```
1778    pub fn decimal_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
1779        let decimal = Decimal::from_str_exact(s.as_ref()).map_err(|e| {
1780            Error::new(ErrorKind::DataInvalid, "Can't parse decimal.").with_source(e)
1781        })?;
1782        Ok(Self::decimal(decimal.mantissa()))
1783    }
1784
1785    /// Attempts to convert the Literal to a PrimitiveLiteral
1786    pub fn as_primitive_literal(&self) -> Option<PrimitiveLiteral> {
1787        match self {
1788            Literal::Primitive(primitive) => Some(primitive.clone()),
1789            _ => None,
1790        }
1791    }
1792}
1793
1794/// The partition struct stores the tuple of partition values for each file.
1795/// Its type is derived from the partition fields of the partition spec used to write the manifest file.
1796/// In v2, the partition struct’s field ids must match the ids from the partition spec.
1797#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1798pub struct Struct {
1799    /// Vector to store the field values
1800    fields: Vec<Option<Literal>>,
1801}
1802
1803impl Struct {
1804    /// Create a empty struct.
1805    pub fn empty() -> Self {
1806        Self { fields: Vec::new() }
1807    }
1808
1809    /// Create a iterator to read the field in order of field_value.
1810    pub fn iter(&self) -> impl ExactSizeIterator<Item = Option<&Literal>> {
1811        self.fields.iter().map(|field| field.as_ref())
1812    }
1813
1814    /// returns true if the field at position `index` is null
1815    pub fn is_null_at_index(&self, index: usize) -> bool {
1816        self.fields[index].is_none()
1817    }
1818
1819    /// Return fields in the struct.
1820    pub fn fields(&self) -> &[Option<Literal>] {
1821        &self.fields
1822    }
1823}
1824
1825impl Index<usize> for Struct {
1826    type Output = Option<Literal>;
1827
1828    fn index(&self, idx: usize) -> &Self::Output {
1829        &self.fields[idx]
1830    }
1831}
1832
1833impl IntoIterator for Struct {
1834    type Item = Option<Literal>;
1835
1836    type IntoIter = std::vec::IntoIter<Option<Literal>>;
1837
1838    fn into_iter(self) -> Self::IntoIter {
1839        self.fields.into_iter()
1840    }
1841}
1842
1843impl FromIterator<Option<Literal>> for Struct {
1844    fn from_iter<I: IntoIterator<Item = Option<Literal>>>(iter: I) -> Self {
1845        Struct {
1846            fields: iter.into_iter().collect(),
1847        }
1848    }
1849}
1850
1851impl Literal {
1852    /// Create iceberg value from a json value
1853    ///
1854    /// See [this spec](https://iceberg.apache.org/spec/#json-single-value-serialization) for reference.
1855    pub fn try_from_json(value: JsonValue, data_type: &Type) -> Result<Option<Self>> {
1856        match data_type {
1857            Type::Primitive(primitive) => match (primitive, value) {
1858                (PrimitiveType::Boolean, JsonValue::Bool(bool)) => {
1859                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Boolean(bool))))
1860                }
1861                (PrimitiveType::Int, JsonValue::Number(number)) => {
1862                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Int(
1863                        number
1864                            .as_i64()
1865                            .ok_or(Error::new(
1866                                crate::ErrorKind::DataInvalid,
1867                                "Failed to convert json number to int",
1868                            ))?
1869                            .try_into()?,
1870                    ))))
1871                }
1872                (PrimitiveType::Long, JsonValue::Number(number)) => Ok(Some(Literal::Primitive(
1873                    PrimitiveLiteral::Long(number.as_i64().ok_or(Error::new(
1874                        crate::ErrorKind::DataInvalid,
1875                        "Failed to convert json number to long",
1876                    ))?),
1877                ))),
1878                (PrimitiveType::Float, JsonValue::Number(number)) => Ok(Some(Literal::Primitive(
1879                    PrimitiveLiteral::Float(OrderedFloat(number.as_f64().ok_or(Error::new(
1880                        crate::ErrorKind::DataInvalid,
1881                        "Failed to convert json number to float",
1882                    ))? as f32)),
1883                ))),
1884                (PrimitiveType::Double, JsonValue::Number(number)) => Ok(Some(Literal::Primitive(
1885                    PrimitiveLiteral::Double(OrderedFloat(number.as_f64().ok_or(Error::new(
1886                        crate::ErrorKind::DataInvalid,
1887                        "Failed to convert json number to double",
1888                    ))?)),
1889                ))),
1890                (PrimitiveType::Date, JsonValue::String(s)) => {
1891                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Int(
1892                        date::date_to_days(&NaiveDate::parse_from_str(&s, "%Y-%m-%d")?),
1893                    ))))
1894                }
1895                (PrimitiveType::Time, JsonValue::String(s)) => {
1896                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Long(
1897                        time::time_to_microseconds(&NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")?),
1898                    ))))
1899                }
1900                (PrimitiveType::Timestamp, JsonValue::String(s)) => Ok(Some(Literal::Primitive(
1901                    PrimitiveLiteral::Long(timestamp::datetime_to_microseconds(
1902                        &NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%.f")?,
1903                    )),
1904                ))),
1905                (PrimitiveType::Timestamptz, JsonValue::String(s)) => {
1906                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Long(
1907                        timestamptz::datetimetz_to_microseconds(&Utc.from_utc_datetime(
1908                            &NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%.f+00:00")?,
1909                        )),
1910                    ))))
1911                }
1912                (PrimitiveType::String, JsonValue::String(s)) => {
1913                    Ok(Some(Literal::Primitive(PrimitiveLiteral::String(s))))
1914                }
1915                (PrimitiveType::Uuid, JsonValue::String(s)) => Ok(Some(Literal::Primitive(
1916                    PrimitiveLiteral::UInt128(Uuid::parse_str(&s)?.as_u128()),
1917                ))),
1918                (PrimitiveType::Fixed(_), JsonValue::String(_)) => todo!(),
1919                (PrimitiveType::Binary, JsonValue::String(_)) => todo!(),
1920                (
1921                    PrimitiveType::Decimal {
1922                        precision: _,
1923                        scale,
1924                    },
1925                    JsonValue::String(s),
1926                ) => {
1927                    let mut decimal = Decimal::from_str_exact(&s)?;
1928                    decimal.rescale(*scale);
1929                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Int128(
1930                        decimal.mantissa(),
1931                    ))))
1932                }
1933                (_, JsonValue::Null) => Ok(None),
1934                (i, j) => Err(Error::new(
1935                    crate::ErrorKind::DataInvalid,
1936                    format!(
1937                        "The json value {} doesn't fit to the iceberg type {}.",
1938                        j, i
1939                    ),
1940                )),
1941            },
1942            Type::Struct(schema) => {
1943                if let JsonValue::Object(mut object) = value {
1944                    Ok(Some(Literal::Struct(Struct::from_iter(
1945                        schema.fields().iter().map(|field| {
1946                            object.remove(&field.id.to_string()).and_then(|value| {
1947                                Literal::try_from_json(value, &field.field_type)
1948                                    .and_then(|value| {
1949                                        value.ok_or(Error::new(
1950                                            ErrorKind::DataInvalid,
1951                                            "Key of map cannot be null",
1952                                        ))
1953                                    })
1954                                    .ok()
1955                            })
1956                        }),
1957                    ))))
1958                } else {
1959                    Err(Error::new(
1960                        crate::ErrorKind::DataInvalid,
1961                        "The json value for a struct type must be an object.",
1962                    ))
1963                }
1964            }
1965            Type::List(list) => {
1966                if let JsonValue::Array(array) = value {
1967                    Ok(Some(Literal::List(
1968                        array
1969                            .into_iter()
1970                            .map(|value| {
1971                                Literal::try_from_json(value, &list.element_field.field_type)
1972                            })
1973                            .collect::<Result<Vec<_>>>()?,
1974                    )))
1975                } else {
1976                    Err(Error::new(
1977                        crate::ErrorKind::DataInvalid,
1978                        "The json value for a list type must be an array.",
1979                    ))
1980                }
1981            }
1982            Type::Map(map) => {
1983                if let JsonValue::Object(mut object) = value {
1984                    if let (Some(JsonValue::Array(keys)), Some(JsonValue::Array(values))) =
1985                        (object.remove("keys"), object.remove("values"))
1986                    {
1987                        Ok(Some(Literal::Map(Map::from_iter(
1988                            keys.into_iter()
1989                                .zip(values.into_iter())
1990                                .map(|(key, value)| {
1991                                    Ok((
1992                                        Literal::try_from_json(key, &map.key_field.field_type)
1993                                            .and_then(|value| {
1994                                                value.ok_or(Error::new(
1995                                                    ErrorKind::DataInvalid,
1996                                                    "Key of map cannot be null",
1997                                                ))
1998                                            })?,
1999                                        Literal::try_from_json(value, &map.value_field.field_type)?,
2000                                    ))
2001                                })
2002                                .collect::<Result<Vec<_>>>()?,
2003                        ))))
2004                    } else {
2005                        Err(Error::new(
2006                            crate::ErrorKind::DataInvalid,
2007                            "The json value for a list type must be an array.",
2008                        ))
2009                    }
2010                } else {
2011                    Err(Error::new(
2012                        crate::ErrorKind::DataInvalid,
2013                        "The json value for a list type must be an array.",
2014                    ))
2015                }
2016            }
2017        }
2018    }
2019
2020    /// Converting iceberg value to json value.
2021    ///
2022    /// See [this spec](https://iceberg.apache.org/spec/#json-single-value-serialization) for reference.
2023    pub fn try_into_json(self, r#type: &Type) -> Result<JsonValue> {
2024        match (self, r#type) {
2025            (Literal::Primitive(prim), Type::Primitive(prim_type)) => match (prim_type, prim) {
2026                (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(val)) => {
2027                    Ok(JsonValue::Bool(val))
2028                }
2029                (PrimitiveType::Int, PrimitiveLiteral::Int(val)) => {
2030                    Ok(JsonValue::Number((val).into()))
2031                }
2032                (PrimitiveType::Long, PrimitiveLiteral::Long(val)) => {
2033                    Ok(JsonValue::Number((val).into()))
2034                }
2035                (PrimitiveType::Float, PrimitiveLiteral::Float(val)) => {
2036                    match Number::from_f64(val.0 as f64) {
2037                        Some(number) => Ok(JsonValue::Number(number)),
2038                        None => Ok(JsonValue::Null),
2039                    }
2040                }
2041                (PrimitiveType::Double, PrimitiveLiteral::Double(val)) => {
2042                    match Number::from_f64(val.0) {
2043                        Some(number) => Ok(JsonValue::Number(number)),
2044                        None => Ok(JsonValue::Null),
2045                    }
2046                }
2047                (PrimitiveType::Date, PrimitiveLiteral::Int(val)) => {
2048                    Ok(JsonValue::String(date::days_to_date(val).to_string()))
2049                }
2050                (PrimitiveType::Time, PrimitiveLiteral::Long(val)) => Ok(JsonValue::String(
2051                    time::microseconds_to_time(val).to_string(),
2052                )),
2053                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(val)) => Ok(JsonValue::String(
2054                    timestamp::microseconds_to_datetime(val)
2055                        .format("%Y-%m-%dT%H:%M:%S%.f")
2056                        .to_string(),
2057                )),
2058                (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(val)) => Ok(JsonValue::String(
2059                    timestamptz::microseconds_to_datetimetz(val)
2060                        .format("%Y-%m-%dT%H:%M:%S%.f+00:00")
2061                        .to_string(),
2062                )),
2063                (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(val)) => Ok(JsonValue::String(
2064                    timestamp::nanoseconds_to_datetime(val)
2065                        .format("%Y-%m-%dT%H:%M:%S%.f")
2066                        .to_string(),
2067                )),
2068                (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(val)) => {
2069                    Ok(JsonValue::String(
2070                        timestamptz::nanoseconds_to_datetimetz(val)
2071                            .format("%Y-%m-%dT%H:%M:%S%.f+00:00")
2072                            .to_string(),
2073                    ))
2074                }
2075                (PrimitiveType::String, PrimitiveLiteral::String(val)) => {
2076                    Ok(JsonValue::String(val.clone()))
2077                }
2078                (_, PrimitiveLiteral::UInt128(val)) => {
2079                    Ok(JsonValue::String(Uuid::from_u128(val).to_string()))
2080                }
2081                (_, PrimitiveLiteral::Binary(val)) => Ok(JsonValue::String(val.iter().fold(
2082                    String::new(),
2083                    |mut acc, x| {
2084                        acc.push_str(&format!("{:x}", x));
2085                        acc
2086                    },
2087                ))),
2088                (_, PrimitiveLiteral::Int128(val)) => match r#type {
2089                    Type::Primitive(PrimitiveType::Decimal {
2090                        precision: _precision,
2091                        scale,
2092                    }) => {
2093                        let decimal = Decimal::try_from_i128_with_scale(val, *scale)?;
2094                        Ok(JsonValue::String(decimal.to_string()))
2095                    }
2096                    _ => Err(Error::new(
2097                        ErrorKind::DataInvalid,
2098                        "The iceberg type for decimal literal must be decimal.",
2099                    ))?,
2100                },
2101                _ => Err(Error::new(
2102                    ErrorKind::DataInvalid,
2103                    "The iceberg value doesn't fit to the iceberg type.",
2104                )),
2105            },
2106            (Literal::Struct(s), Type::Struct(struct_type)) => {
2107                let mut id_and_value = Vec::with_capacity(struct_type.fields().len());
2108                for (value, field) in s.into_iter().zip(struct_type.fields()) {
2109                    let json = match value {
2110                        Some(val) => val.try_into_json(&field.field_type)?,
2111                        None => JsonValue::Null,
2112                    };
2113                    id_and_value.push((field.id.to_string(), json));
2114                }
2115                Ok(JsonValue::Object(JsonMap::from_iter(id_and_value)))
2116            }
2117            (Literal::List(list), Type::List(list_type)) => Ok(JsonValue::Array(
2118                list.into_iter()
2119                    .map(|opt| match opt {
2120                        Some(literal) => literal.try_into_json(&list_type.element_field.field_type),
2121                        None => Ok(JsonValue::Null),
2122                    })
2123                    .collect::<Result<Vec<JsonValue>>>()?,
2124            )),
2125            (Literal::Map(map), Type::Map(map_type)) => {
2126                let mut object = JsonMap::with_capacity(2);
2127                let mut json_keys = Vec::with_capacity(map.len());
2128                let mut json_values = Vec::with_capacity(map.len());
2129                for (key, value) in map.into_iter() {
2130                    json_keys.push(key.try_into_json(&map_type.key_field.field_type)?);
2131                    json_values.push(match value {
2132                        Some(literal) => literal.try_into_json(&map_type.value_field.field_type)?,
2133                        None => JsonValue::Null,
2134                    });
2135                }
2136                object.insert("keys".to_string(), JsonValue::Array(json_keys));
2137                object.insert("values".to_string(), JsonValue::Array(json_values));
2138                Ok(JsonValue::Object(object))
2139            }
2140            (value, r#type) => Err(Error::new(
2141                ErrorKind::DataInvalid,
2142                format!(
2143                    "The iceberg value {:?} doesn't fit to the iceberg type {}.",
2144                    value, r#type
2145                ),
2146            )),
2147        }
2148    }
2149
2150    /// Convert Value to the any type
2151    pub fn into_any(self) -> Box<dyn Any> {
2152        match self {
2153            Literal::Primitive(prim) => match prim {
2154                PrimitiveLiteral::Boolean(any) => Box::new(any),
2155                PrimitiveLiteral::Int(any) => Box::new(any),
2156                PrimitiveLiteral::Long(any) => Box::new(any),
2157                PrimitiveLiteral::Float(any) => Box::new(any),
2158                PrimitiveLiteral::Double(any) => Box::new(any),
2159                PrimitiveLiteral::Binary(any) => Box::new(any),
2160                PrimitiveLiteral::String(any) => Box::new(any),
2161                PrimitiveLiteral::UInt128(any) => Box::new(any),
2162                PrimitiveLiteral::Int128(any) => Box::new(any),
2163                PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => unimplemented!(),
2164            },
2165            _ => unimplemented!(),
2166        }
2167    }
2168}
2169
2170mod date {
2171    use chrono::{DateTime, NaiveDate, TimeDelta, TimeZone, Utc};
2172
2173    pub(crate) fn date_to_days(date: &NaiveDate) -> i32 {
2174        date.signed_duration_since(
2175            // This is always the same and shouldn't fail
2176            NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
2177        )
2178        .num_days() as i32
2179    }
2180
2181    pub(crate) fn days_to_date(days: i32) -> NaiveDate {
2182        // This shouldn't fail until the year 262000
2183        (chrono::DateTime::UNIX_EPOCH + TimeDelta::try_days(days as i64).unwrap())
2184            .naive_utc()
2185            .date()
2186    }
2187
2188    /// Returns unix epoch.
2189    pub(crate) fn unix_epoch() -> DateTime<Utc> {
2190        Utc.timestamp_nanos(0)
2191    }
2192
2193    /// Creates date literal from `NaiveDate`, assuming it's utc timezone.
2194    pub(crate) fn date_from_naive_date(date: NaiveDate) -> i32 {
2195        (date - unix_epoch().date_naive()).num_days() as i32
2196    }
2197}
2198
2199mod time {
2200    use chrono::NaiveTime;
2201
2202    pub(crate) fn time_to_microseconds(time: &NaiveTime) -> i64 {
2203        time.signed_duration_since(
2204            // This is always the same and shouldn't fail
2205            NaiveTime::from_num_seconds_from_midnight_opt(0, 0).unwrap(),
2206        )
2207        .num_microseconds()
2208        .unwrap()
2209    }
2210
2211    pub(crate) fn microseconds_to_time(micros: i64) -> NaiveTime {
2212        let (secs, rem) = (micros / 1_000_000, micros % 1_000_000);
2213
2214        NaiveTime::from_num_seconds_from_midnight_opt(secs as u32, rem as u32 * 1_000).unwrap()
2215    }
2216}
2217
2218mod timestamp {
2219    use chrono::{DateTime, NaiveDateTime};
2220
2221    pub(crate) fn datetime_to_microseconds(time: &NaiveDateTime) -> i64 {
2222        time.and_utc().timestamp_micros()
2223    }
2224
2225    pub(crate) fn microseconds_to_datetime(micros: i64) -> NaiveDateTime {
2226        // This shouldn't fail until the year 262000
2227        DateTime::from_timestamp_micros(micros).unwrap().naive_utc()
2228    }
2229
2230    pub(crate) fn nanoseconds_to_datetime(nanos: i64) -> NaiveDateTime {
2231        DateTime::from_timestamp_nanos(nanos).naive_utc()
2232    }
2233}
2234
2235mod timestamptz {
2236    use chrono::{DateTime, Utc};
2237
2238    pub(crate) fn datetimetz_to_microseconds(time: &DateTime<Utc>) -> i64 {
2239        time.timestamp_micros()
2240    }
2241
2242    pub(crate) fn microseconds_to_datetimetz(micros: i64) -> DateTime<Utc> {
2243        let (secs, rem) = (micros / 1_000_000, micros % 1_000_000);
2244
2245        DateTime::from_timestamp(secs, rem as u32 * 1_000).unwrap()
2246    }
2247
2248    pub(crate) fn nanoseconds_to_datetimetz(nanos: i64) -> DateTime<Utc> {
2249        let (secs, rem) = (nanos / 1_000_000_000, nanos % 1_000_000_000);
2250
2251        DateTime::from_timestamp(secs, rem as u32).unwrap()
2252    }
2253}
2254
2255mod _serde {
2256    use serde::de::Visitor;
2257    use serde::ser::{SerializeMap, SerializeSeq, SerializeStruct};
2258    use serde::{Deserialize, Serialize};
2259    use serde_bytes::ByteBuf;
2260    use serde_derive::{Deserialize as DeserializeDerive, Serialize as SerializeDerive};
2261
2262    use super::{Literal, Map, PrimitiveLiteral};
2263    use crate::spec::{MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, PrimitiveType, Type};
2264    use crate::{Error, ErrorKind};
2265
2266    #[derive(SerializeDerive, DeserializeDerive, Debug)]
2267    #[serde(transparent)]
2268    /// Raw literal representation used for serde. The serialize way is used for Avro serializer.
2269    pub struct RawLiteral(RawLiteralEnum);
2270
2271    impl RawLiteral {
2272        /// Covert literal to raw literal.
2273        pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> {
2274            Ok(Self(RawLiteralEnum::try_from(literal, ty)?))
2275        }
2276
2277        /// Convert raw literal to literal.
2278        pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> {
2279            self.0.try_into(ty)
2280        }
2281    }
2282
2283    #[derive(SerializeDerive, Clone, Debug)]
2284    #[serde(untagged)]
2285    enum RawLiteralEnum {
2286        Null,
2287        Boolean(bool),
2288        Int(i32),
2289        Long(i64),
2290        Float(f32),
2291        Double(f64),
2292        String(String),
2293        Bytes(ByteBuf),
2294        List(List),
2295        StringMap(StringMap),
2296        Record(Record),
2297    }
2298
2299    #[derive(Clone, Debug)]
2300    struct Record {
2301        required: Vec<(String, RawLiteralEnum)>,
2302        optional: Vec<(String, Option<RawLiteralEnum>)>,
2303    }
2304
2305    impl Serialize for Record {
2306        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2307        where S: serde::Serializer {
2308            let len = self.required.len() + self.optional.len();
2309            let mut record = serializer.serialize_struct("", len)?;
2310            for (k, v) in &self.required {
2311                record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?;
2312            }
2313            for (k, v) in &self.optional {
2314                record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?;
2315            }
2316            record.end()
2317        }
2318    }
2319
2320    #[derive(Clone, Debug)]
2321    struct List {
2322        list: Vec<Option<RawLiteralEnum>>,
2323        required: bool,
2324    }
2325
2326    impl Serialize for List {
2327        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2328        where S: serde::Serializer {
2329            let mut seq = serializer.serialize_seq(Some(self.list.len()))?;
2330            for value in &self.list {
2331                if self.required {
2332                    seq.serialize_element(value.as_ref().ok_or_else(|| {
2333                        serde::ser::Error::custom(
2334                            "List element is required, element cannot be null",
2335                        )
2336                    })?)?;
2337                } else {
2338                    seq.serialize_element(&value)?;
2339                }
2340            }
2341            seq.end()
2342        }
2343    }
2344
2345    #[derive(Clone, Debug)]
2346    struct StringMap {
2347        raw: Vec<(String, Option<RawLiteralEnum>)>,
2348        required: bool,
2349    }
2350
2351    impl Serialize for StringMap {
2352        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2353        where S: serde::Serializer {
2354            let mut map = serializer.serialize_map(Some(self.raw.len()))?;
2355            for (k, v) in &self.raw {
2356                if self.required {
2357                    map.serialize_entry(
2358                        k,
2359                        v.as_ref().ok_or_else(|| {
2360                            serde::ser::Error::custom(
2361                                "Map element is required, element cannot be null",
2362                            )
2363                        })?,
2364                    )?;
2365                } else {
2366                    map.serialize_entry(k, v)?;
2367                }
2368            }
2369            map.end()
2370        }
2371    }
2372
2373    impl<'de> Deserialize<'de> for RawLiteralEnum {
2374        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
2375        where D: serde::Deserializer<'de> {
2376            struct RawLiteralVisitor;
2377            impl<'de> Visitor<'de> for RawLiteralVisitor {
2378                type Value = RawLiteralEnum;
2379
2380                fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
2381                    formatter.write_str("expect")
2382                }
2383
2384                fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
2385                where E: serde::de::Error {
2386                    Ok(RawLiteralEnum::Boolean(v))
2387                }
2388
2389                fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
2390                where E: serde::de::Error {
2391                    Ok(RawLiteralEnum::Int(v))
2392                }
2393
2394                fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
2395                where E: serde::de::Error {
2396                    Ok(RawLiteralEnum::Long(v))
2397                }
2398
2399                /// Used in json
2400                fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
2401                where E: serde::de::Error {
2402                    Ok(RawLiteralEnum::Long(v as i64))
2403                }
2404
2405                fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E>
2406                where E: serde::de::Error {
2407                    Ok(RawLiteralEnum::Float(v))
2408                }
2409
2410                fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
2411                where E: serde::de::Error {
2412                    Ok(RawLiteralEnum::Double(v))
2413                }
2414
2415                fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
2416                where E: serde::de::Error {
2417                    Ok(RawLiteralEnum::String(v.to_string()))
2418                }
2419
2420                fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
2421                where E: serde::de::Error {
2422                    Ok(RawLiteralEnum::Bytes(ByteBuf::from(v)))
2423                }
2424
2425                fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E>
2426                where E: serde::de::Error {
2427                    Ok(RawLiteralEnum::String(v.to_string()))
2428                }
2429
2430                fn visit_unit<E>(self) -> Result<Self::Value, E>
2431                where E: serde::de::Error {
2432                    Ok(RawLiteralEnum::Null)
2433                }
2434
2435                fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
2436                where A: serde::de::MapAccess<'de> {
2437                    let mut required = Vec::new();
2438                    while let Some(key) = map.next_key::<String>()? {
2439                        let value = map.next_value::<RawLiteralEnum>()?;
2440                        required.push((key, value));
2441                    }
2442                    Ok(RawLiteralEnum::Record(Record {
2443                        required,
2444                        optional: Vec::new(),
2445                    }))
2446                }
2447
2448                fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
2449                where A: serde::de::SeqAccess<'de> {
2450                    let mut list = Vec::new();
2451                    while let Some(value) = seq.next_element::<RawLiteralEnum>()? {
2452                        list.push(Some(value));
2453                    }
2454                    Ok(RawLiteralEnum::List(List {
2455                        list,
2456                        // `required` only used in serialize, just set default in deserialize.
2457                        required: false,
2458                    }))
2459                }
2460            }
2461            deserializer.deserialize_any(RawLiteralVisitor)
2462        }
2463    }
2464
2465    impl RawLiteralEnum {
2466        pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> {
2467            let raw = match literal {
2468                Literal::Primitive(prim) => match prim {
2469                    super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v),
2470                    super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v),
2471                    super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v),
2472                    super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0),
2473                    super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0),
2474                    super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v),
2475                    super::PrimitiveLiteral::UInt128(v) => {
2476                        RawLiteralEnum::Bytes(ByteBuf::from(v.to_be_bytes()))
2477                    }
2478                    super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)),
2479                    super::PrimitiveLiteral::Int128(v) => {
2480                        RawLiteralEnum::Bytes(ByteBuf::from(v.to_be_bytes()))
2481                    }
2482                    super::PrimitiveLiteral::AboveMax | super::PrimitiveLiteral::BelowMin => {
2483                        return Err(Error::new(
2484                            ErrorKind::DataInvalid,
2485                            "Can't convert AboveMax or BelowMax",
2486                        ));
2487                    }
2488                },
2489                Literal::Struct(r#struct) => {
2490                    let mut required = Vec::new();
2491                    let mut optional = Vec::new();
2492                    if let Type::Struct(struct_ty) = ty {
2493                        for (value, field) in r#struct.into_iter().zip(struct_ty.fields()) {
2494                            if field.required {
2495                                if let Some(value) = value {
2496                                    required.push((
2497                                        field.name.clone(),
2498                                        RawLiteralEnum::try_from(value, &field.field_type)?,
2499                                    ));
2500                                } else {
2501                                    return Err(Error::new(
2502                                        ErrorKind::DataInvalid,
2503                                        "Can't convert null to required field",
2504                                    ));
2505                                }
2506                            } else if let Some(value) = value {
2507                                optional.push((
2508                                    field.name.clone(),
2509                                    Some(RawLiteralEnum::try_from(value, &field.field_type)?),
2510                                ));
2511                            } else {
2512                                optional.push((field.name.clone(), None));
2513                            }
2514                        }
2515                    } else {
2516                        return Err(Error::new(
2517                            ErrorKind::DataInvalid,
2518                            format!("Type {} should be a struct", ty),
2519                        ));
2520                    }
2521                    RawLiteralEnum::Record(Record { required, optional })
2522                }
2523                Literal::List(list) => {
2524                    if let Type::List(list_ty) = ty {
2525                        let list = list
2526                            .into_iter()
2527                            .map(|v| {
2528                                v.map(|v| {
2529                                    RawLiteralEnum::try_from(v, &list_ty.element_field.field_type)
2530                                })
2531                                .transpose()
2532                            })
2533                            .collect::<Result<_, Error>>()?;
2534                        RawLiteralEnum::List(List {
2535                            list,
2536                            required: list_ty.element_field.required,
2537                        })
2538                    } else {
2539                        return Err(Error::new(
2540                            ErrorKind::DataInvalid,
2541                            format!("Type {} should be a list", ty),
2542                        ));
2543                    }
2544                }
2545                Literal::Map(map) => {
2546                    if let Type::Map(map_ty) = ty {
2547                        if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type
2548                        {
2549                            let mut raw = Vec::with_capacity(map.len());
2550                            for (k, v) in map {
2551                                if let Literal::Primitive(PrimitiveLiteral::String(k)) = k {
2552                                    raw.push((
2553                                        k,
2554                                        v.map(|v| {
2555                                            RawLiteralEnum::try_from(
2556                                                v,
2557                                                &map_ty.value_field.field_type,
2558                                            )
2559                                        })
2560                                        .transpose()?,
2561                                    ));
2562                                } else {
2563                                    return Err(Error::new(
2564                                        ErrorKind::DataInvalid,
2565                                        "literal type is inconsistent with type",
2566                                    ));
2567                                }
2568                            }
2569                            RawLiteralEnum::StringMap(StringMap {
2570                                raw,
2571                                required: map_ty.value_field.required,
2572                            })
2573                        } else {
2574                            let list = map.into_iter().map(|(k,v)| {
2575                                let raw_k =
2576                                    RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?;
2577                                let raw_v = v
2578                                    .map(|v| {
2579                                        RawLiteralEnum::try_from(v, &map_ty.value_field.field_type)
2580                                    })
2581                                    .transpose()?;
2582                                if map_ty.value_field.required {
2583                                    Ok(Some(RawLiteralEnum::Record(Record {
2584                                        required: vec![
2585                                            (MAP_KEY_FIELD_NAME.to_string(), raw_k),
2586                                            (MAP_VALUE_FIELD_NAME.to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?),
2587                                        ],
2588                                        optional: vec![],
2589                                    })))
2590                                } else {
2591                                    Ok(Some(RawLiteralEnum::Record(Record {
2592                                        required: vec![
2593                                            (MAP_KEY_FIELD_NAME.to_string(), raw_k),
2594                                        ],
2595                                        optional: vec![
2596                                            (MAP_VALUE_FIELD_NAME.to_string(), raw_v)
2597                                        ],
2598                                    })))
2599                                }
2600                            }).collect::<Result<_, Error>>()?;
2601                            RawLiteralEnum::List(List {
2602                                list,
2603                                required: true,
2604                            })
2605                        }
2606                    } else {
2607                        return Err(Error::new(
2608                            ErrorKind::DataInvalid,
2609                            format!("Type {} should be a map", ty),
2610                        ));
2611                    }
2612                }
2613            };
2614            Ok(raw)
2615        }
2616
2617        pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> {
2618            let invalid_err = |v: &str| {
2619                Error::new(
2620                    ErrorKind::DataInvalid,
2621                    format!(
2622                        "Unable to convert raw literal ({}) fail convert to type {} for: type mismatch",
2623                        v, ty
2624                    ),
2625                )
2626            };
2627            let invalid_err_with_reason = |v: &str, reason: &str| {
2628                Error::new(
2629                    ErrorKind::DataInvalid,
2630                    format!(
2631                        "Unable to convert raw literal ({}) fail convert to type {} for: {}",
2632                        v, ty, reason
2633                    ),
2634                )
2635            };
2636            match self {
2637                RawLiteralEnum::Null => Ok(None),
2638                RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))),
2639                RawLiteralEnum::Int(v) => match ty {
2640                    Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))),
2641                    Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(i64::from(v)))),
2642                    Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))),
2643                    _ => Err(invalid_err("int")),
2644                },
2645                RawLiteralEnum::Long(v) => match ty {
2646                    Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(
2647                        i32::try_from(v).map_err(|_| invalid_err("long"))?,
2648                    ))),
2649                    Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(
2650                        i32::try_from(v).map_err(|_| invalid_err("long"))?,
2651                    ))),
2652                    Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))),
2653                    Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))),
2654                    Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))),
2655                    Type::Primitive(PrimitiveType::Timestamptz) => {
2656                        Ok(Some(Literal::timestamptz(v)))
2657                    }
2658                    _ => Err(invalid_err("long")),
2659                },
2660                RawLiteralEnum::Float(v) => match ty {
2661                    Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))),
2662                    Type::Primitive(PrimitiveType::Double) => {
2663                        Ok(Some(Literal::double(f64::from(v))))
2664                    }
2665                    _ => Err(invalid_err("float")),
2666                },
2667                RawLiteralEnum::Double(v) => match ty {
2668                    Type::Primitive(PrimitiveType::Float) => {
2669                        let v_32 = v as f32;
2670                        if v_32.is_finite() {
2671                            let v_64 = f64::from(v_32);
2672                            if (v_64 - v).abs() > f32::EPSILON as f64 {
2673                                // there is a precision loss
2674                                return Err(invalid_err("double"));
2675                            }
2676                        }
2677                        Ok(Some(Literal::float(v_32)))
2678                    }
2679                    Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))),
2680                    _ => Err(invalid_err("double")),
2681                },
2682                RawLiteralEnum::String(v) => match ty {
2683                    Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))),
2684                    _ => Err(invalid_err("string")),
2685                },
2686                // # TODO:https://github.com/apache/iceberg-rust/issues/86
2687                // rust avro don't support deserialize any bytes representation now.
2688                RawLiteralEnum::Bytes(_) => Err(invalid_err_with_reason(
2689                    "bytes",
2690                    "todo: rust avro doesn't support deserialize any bytes representation now",
2691                )),
2692                RawLiteralEnum::List(v) => match ty {
2693                    Type::List(ty) => Ok(Some(Literal::List(
2694                        v.list
2695                            .into_iter()
2696                            .map(|v| {
2697                                if let Some(v) = v {
2698                                    v.try_into(&ty.element_field.field_type)
2699                                } else {
2700                                    Ok(None)
2701                                }
2702                            })
2703                            .collect::<Result<_, Error>>()?,
2704                    ))),
2705                    Type::Map(map_ty) => {
2706                        let key_ty = map_ty.key_field.field_type.as_ref();
2707                        let value_ty = map_ty.value_field.field_type.as_ref();
2708                        let mut map = Map::new();
2709                        for k_v in v.list {
2710                            let k_v = k_v.ok_or_else(|| invalid_err_with_reason("list","In deserialize, None will be represented as Some(RawLiteral::Null), all element in list must be valid"))?;
2711                            if let RawLiteralEnum::Record(Record {
2712                                required,
2713                                optional: _,
2714                            }) = k_v
2715                            {
2716                                if required.len() != 2 {
2717                                    return Err(invalid_err_with_reason(
2718                                        "list",
2719                                        "Record must contains two element(key and value) of array",
2720                                    ));
2721                                }
2722                                let mut key = None;
2723                                let mut value = None;
2724                                required.into_iter().for_each(|(k, v)| {
2725                                    if k == MAP_KEY_FIELD_NAME {
2726                                        key = Some(v);
2727                                    } else if k == MAP_VALUE_FIELD_NAME {
2728                                        value = Some(v);
2729                                    }
2730                                });
2731                                match (key, value) {
2732                                    (Some(k), Some(v)) => {
2733                                        let key = k.try_into(key_ty)?.ok_or_else(|| {
2734                                            invalid_err_with_reason(
2735                                                "list",
2736                                                "Key element in Map must be valid",
2737                                            )
2738                                        })?;
2739                                        let value = v.try_into(value_ty)?;
2740                                        if map_ty.value_field.required && value.is_none() {
2741                                            return Err(invalid_err_with_reason(
2742                                                "list",
2743                                                "Value element is required in this Map",
2744                                            ));
2745                                        }
2746                                        map.insert(key, value);
2747                                    }
2748                                    _ => {
2749                                        return Err(invalid_err_with_reason(
2750                                            "list",
2751                                            "The elements of record in list are not key and value",
2752                                        ));
2753                                    }
2754                                }
2755                            } else {
2756                                return Err(invalid_err_with_reason(
2757                                    "list",
2758                                    "Map should represented as record array.",
2759                                ));
2760                            }
2761                        }
2762                        Ok(Some(Literal::Map(map)))
2763                    }
2764                    Type::Primitive(PrimitiveType::Uuid) => {
2765                        if v.list.len() != 16 {
2766                            return Err(invalid_err_with_reason(
2767                                "list",
2768                                "The length of list should be 16",
2769                            ));
2770                        }
2771                        let mut bytes = [0u8; 16];
2772                        for (i, v) in v.list.iter().enumerate() {
2773                            if let Some(RawLiteralEnum::Long(v)) = v {
2774                                bytes[i] = *v as u8;
2775                            } else {
2776                                return Err(invalid_err_with_reason(
2777                                    "list",
2778                                    "The element of list should be int",
2779                                ));
2780                            }
2781                        }
2782                        Ok(Some(Literal::uuid(uuid::Uuid::from_bytes(bytes))))
2783                    }
2784                    Type::Primitive(PrimitiveType::Decimal {
2785                        precision: _,
2786                        scale: _,
2787                    }) => {
2788                        if v.list.len() != 16 {
2789                            return Err(invalid_err_with_reason(
2790                                "list",
2791                                "The length of list should be 16",
2792                            ));
2793                        }
2794                        let mut bytes = [0u8; 16];
2795                        for (i, v) in v.list.iter().enumerate() {
2796                            if let Some(RawLiteralEnum::Long(v)) = v {
2797                                bytes[i] = *v as u8;
2798                            } else {
2799                                return Err(invalid_err_with_reason(
2800                                    "list",
2801                                    "The element of list should be int",
2802                                ));
2803                            }
2804                        }
2805                        Ok(Some(Literal::decimal(i128::from_be_bytes(bytes))))
2806                    }
2807                    Type::Primitive(PrimitiveType::Binary) => {
2808                        let bytes = v
2809                            .list
2810                            .into_iter()
2811                            .map(|v| {
2812                                if let Some(RawLiteralEnum::Long(v)) = v {
2813                                    Ok(v as u8)
2814                                } else {
2815                                    Err(invalid_err_with_reason(
2816                                        "list",
2817                                        "The element of list should be int",
2818                                    ))
2819                                }
2820                            })
2821                            .collect::<Result<Vec<_>, Error>>()?;
2822                        Ok(Some(Literal::binary(bytes)))
2823                    }
2824                    Type::Primitive(PrimitiveType::Fixed(size)) => {
2825                        if v.list.len() != *size as usize {
2826                            return Err(invalid_err_with_reason(
2827                                "list",
2828                                "The length of list should be equal to size",
2829                            ));
2830                        }
2831                        let bytes = v
2832                            .list
2833                            .into_iter()
2834                            .map(|v| {
2835                                if let Some(RawLiteralEnum::Long(v)) = v {
2836                                    Ok(v as u8)
2837                                } else {
2838                                    Err(invalid_err_with_reason(
2839                                        "list",
2840                                        "The element of list should be int",
2841                                    ))
2842                                }
2843                            })
2844                            .collect::<Result<Vec<_>, Error>>()?;
2845                        Ok(Some(Literal::fixed(bytes)))
2846                    }
2847                    _ => Err(invalid_err("list")),
2848                },
2849                RawLiteralEnum::Record(Record {
2850                    required,
2851                    optional: _,
2852                }) => match ty {
2853                    Type::Struct(struct_ty) => {
2854                        let iters: Vec<Option<Literal>> = required
2855                            .into_iter()
2856                            .map(|(field_name, value)| {
2857                                let field = struct_ty
2858                                    .field_by_name(field_name.as_str())
2859                                    .ok_or_else(|| {
2860                                        invalid_err_with_reason(
2861                                            "record",
2862                                            &format!("field {} is not exist", &field_name),
2863                                        )
2864                                    })?;
2865                                let value = value.try_into(&field.field_type)?;
2866                                Ok(value)
2867                            })
2868                            .collect::<Result<_, Error>>()?;
2869                        Ok(Some(Literal::Struct(super::Struct::from_iter(iters))))
2870                    }
2871                    Type::Map(map_ty) => {
2872                        if *map_ty.key_field.field_type != Type::Primitive(PrimitiveType::String) {
2873                            return Err(invalid_err_with_reason(
2874                                "record",
2875                                "Map key must be string",
2876                            ));
2877                        }
2878                        let mut map = Map::new();
2879                        for (k, v) in required {
2880                            let value = v.try_into(&map_ty.value_field.field_type)?;
2881                            if map_ty.value_field.required && value.is_none() {
2882                                return Err(invalid_err_with_reason(
2883                                    "record",
2884                                    "Value element is required in this Map",
2885                                ));
2886                            }
2887                            map.insert(Literal::string(k), value);
2888                        }
2889                        Ok(Some(Literal::Map(map)))
2890                    }
2891                    _ => Err(invalid_err("record")),
2892                },
2893                RawLiteralEnum::StringMap(_) => Err(invalid_err("string map")),
2894            }
2895        }
2896    }
2897}
2898
2899#[cfg(test)]
2900mod tests {
2901    use apache_avro::to_value;
2902    use apache_avro::types::Value;
2903
2904    use super::*;
2905    use crate::avro::schema_to_avro_schema;
2906    use crate::spec::Schema;
2907    use crate::spec::Type::Primitive;
2908    use crate::spec::datatypes::{ListType, MapType, NestedField, StructType};
2909
2910    fn check_json_serde(json: &str, expected_literal: Literal, expected_type: &Type) {
2911        let raw_json_value = serde_json::from_str::<JsonValue>(json).unwrap();
2912        let desered_literal =
2913            Literal::try_from_json(raw_json_value.clone(), expected_type).unwrap();
2914        assert_eq!(desered_literal, Some(expected_literal.clone()));
2915
2916        let expected_json_value: JsonValue = expected_literal.try_into_json(expected_type).unwrap();
2917        let sered_json = serde_json::to_string(&expected_json_value).unwrap();
2918        let parsed_json_value = serde_json::from_str::<JsonValue>(&sered_json).unwrap();
2919
2920        assert_eq!(parsed_json_value, raw_json_value);
2921    }
2922
2923    fn check_avro_bytes_serde(
2924        input: Vec<u8>,
2925        expected_datum: Datum,
2926        expected_type: &PrimitiveType,
2927    ) {
2928        let raw_schema = r#""bytes""#;
2929        let schema = apache_avro::Schema::parse_str(raw_schema).unwrap();
2930
2931        let bytes = ByteBuf::from(input);
2932        let datum = Datum::try_from_bytes(&bytes, expected_type.clone()).unwrap();
2933        assert_eq!(datum, expected_datum);
2934
2935        let mut writer = apache_avro::Writer::new(&schema, Vec::new());
2936        writer.append_ser(datum.to_bytes().unwrap()).unwrap();
2937        let encoded = writer.into_inner().unwrap();
2938        let reader = apache_avro::Reader::with_schema(&schema, &*encoded).unwrap();
2939
2940        for record in reader {
2941            let result = apache_avro::from_value::<ByteBuf>(&record.unwrap()).unwrap();
2942            let desered_datum = Datum::try_from_bytes(&result, expected_type.clone()).unwrap();
2943            assert_eq!(desered_datum, expected_datum);
2944        }
2945    }
2946
2947    fn check_convert_with_avro(expected_literal: Literal, expected_type: &Type) {
2948        let fields = vec![NestedField::required(1, "col", expected_type.clone()).into()];
2949        let schema = Schema::builder()
2950            .with_fields(fields.clone())
2951            .build()
2952            .unwrap();
2953        let avro_schema = schema_to_avro_schema("test", &schema).unwrap();
2954        let struct_type = Type::Struct(StructType::new(fields));
2955        let struct_literal =
2956            Literal::Struct(Struct::from_iter(vec![Some(expected_literal.clone())]));
2957
2958        let mut writer = apache_avro::Writer::new(&avro_schema, Vec::new());
2959        let raw_literal = RawLiteral::try_from(struct_literal.clone(), &struct_type).unwrap();
2960        writer.append_ser(raw_literal).unwrap();
2961        let encoded = writer.into_inner().unwrap();
2962
2963        let reader = apache_avro::Reader::new(&*encoded).unwrap();
2964        for record in reader {
2965            let result = apache_avro::from_value::<RawLiteral>(&record.unwrap()).unwrap();
2966            let desered_literal = result.try_into(&struct_type).unwrap().unwrap();
2967            assert_eq!(desered_literal, struct_literal);
2968        }
2969    }
2970
2971    fn check_serialize_avro(literal: Literal, ty: &Type, expect_value: Value) {
2972        let expect_value = Value::Record(vec![("col".to_string(), expect_value)]);
2973
2974        let fields = vec![NestedField::required(1, "col", ty.clone()).into()];
2975        let schema = Schema::builder()
2976            .with_fields(fields.clone())
2977            .build()
2978            .unwrap();
2979        let avro_schema = schema_to_avro_schema("test", &schema).unwrap();
2980        let struct_type = Type::Struct(StructType::new(fields));
2981        let struct_literal = Literal::Struct(Struct::from_iter(vec![Some(literal.clone())]));
2982        let mut writer = apache_avro::Writer::new(&avro_schema, Vec::new());
2983        let raw_literal = RawLiteral::try_from(struct_literal.clone(), &struct_type).unwrap();
2984        let value = to_value(raw_literal)
2985            .unwrap()
2986            .resolve(&avro_schema)
2987            .unwrap();
2988        writer.append_value_ref(&value).unwrap();
2989        let encoded = writer.into_inner().unwrap();
2990
2991        let reader = apache_avro::Reader::new(&*encoded).unwrap();
2992        for record in reader {
2993            assert_eq!(record.unwrap(), expect_value);
2994        }
2995    }
2996
2997    #[test]
2998    fn json_boolean() {
2999        let record = r#"true"#;
3000
3001        check_json_serde(
3002            record,
3003            Literal::Primitive(PrimitiveLiteral::Boolean(true)),
3004            &Type::Primitive(PrimitiveType::Boolean),
3005        );
3006    }
3007
3008    #[test]
3009    fn json_int() {
3010        let record = r#"32"#;
3011
3012        check_json_serde(
3013            record,
3014            Literal::Primitive(PrimitiveLiteral::Int(32)),
3015            &Type::Primitive(PrimitiveType::Int),
3016        );
3017    }
3018
3019    #[test]
3020    fn json_long() {
3021        let record = r#"32"#;
3022
3023        check_json_serde(
3024            record,
3025            Literal::Primitive(PrimitiveLiteral::Long(32)),
3026            &Type::Primitive(PrimitiveType::Long),
3027        );
3028    }
3029
3030    #[test]
3031    fn json_float() {
3032        let record = r#"1.0"#;
3033
3034        check_json_serde(
3035            record,
3036            Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(1.0))),
3037            &Type::Primitive(PrimitiveType::Float),
3038        );
3039    }
3040
3041    #[test]
3042    fn json_double() {
3043        let record = r#"1.0"#;
3044
3045        check_json_serde(
3046            record,
3047            Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(1.0))),
3048            &Type::Primitive(PrimitiveType::Double),
3049        );
3050    }
3051
3052    #[test]
3053    fn json_date() {
3054        let record = r#""2017-11-16""#;
3055
3056        check_json_serde(
3057            record,
3058            Literal::Primitive(PrimitiveLiteral::Int(17486)),
3059            &Type::Primitive(PrimitiveType::Date),
3060        );
3061    }
3062
3063    #[test]
3064    fn json_time() {
3065        let record = r#""22:31:08.123456""#;
3066
3067        check_json_serde(
3068            record,
3069            Literal::Primitive(PrimitiveLiteral::Long(81068123456)),
3070            &Type::Primitive(PrimitiveType::Time),
3071        );
3072    }
3073
3074    #[test]
3075    fn json_timestamp() {
3076        let record = r#""2017-11-16T22:31:08.123456""#;
3077
3078        check_json_serde(
3079            record,
3080            Literal::Primitive(PrimitiveLiteral::Long(1510871468123456)),
3081            &Type::Primitive(PrimitiveType::Timestamp),
3082        );
3083    }
3084
3085    #[test]
3086    fn json_timestamptz() {
3087        let record = r#""2017-11-16T22:31:08.123456+00:00""#;
3088
3089        check_json_serde(
3090            record,
3091            Literal::Primitive(PrimitiveLiteral::Long(1510871468123456)),
3092            &Type::Primitive(PrimitiveType::Timestamptz),
3093        );
3094    }
3095
3096    #[test]
3097    fn json_string() {
3098        let record = r#""iceberg""#;
3099
3100        check_json_serde(
3101            record,
3102            Literal::Primitive(PrimitiveLiteral::String("iceberg".to_string())),
3103            &Type::Primitive(PrimitiveType::String),
3104        );
3105    }
3106
3107    #[test]
3108    fn json_uuid() {
3109        let record = r#""f79c3e09-677c-4bbd-a479-3f349cb785e7""#;
3110
3111        check_json_serde(
3112            record,
3113            Literal::Primitive(PrimitiveLiteral::UInt128(
3114                Uuid::parse_str("f79c3e09-677c-4bbd-a479-3f349cb785e7")
3115                    .unwrap()
3116                    .as_u128(),
3117            )),
3118            &Type::Primitive(PrimitiveType::Uuid),
3119        );
3120    }
3121
3122    #[test]
3123    fn json_decimal() {
3124        let record = r#""14.20""#;
3125
3126        check_json_serde(
3127            record,
3128            Literal::Primitive(PrimitiveLiteral::Int128(1420)),
3129            &Type::decimal(28, 2).unwrap(),
3130        );
3131    }
3132
3133    #[test]
3134    fn json_struct() {
3135        let record = r#"{"1": 1, "2": "bar", "3": null}"#;
3136
3137        check_json_serde(
3138            record,
3139            Literal::Struct(Struct::from_iter(vec![
3140                Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
3141                Some(Literal::Primitive(PrimitiveLiteral::String(
3142                    "bar".to_string(),
3143                ))),
3144                None,
3145            ])),
3146            &Type::Struct(StructType::new(vec![
3147                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3148                NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3149                NestedField::optional(3, "address", Type::Primitive(PrimitiveType::String)).into(),
3150            ])),
3151        );
3152    }
3153
3154    #[test]
3155    fn json_list() {
3156        let record = r#"[1, 2, 3, null]"#;
3157
3158        check_json_serde(
3159            record,
3160            Literal::List(vec![
3161                Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
3162                Some(Literal::Primitive(PrimitiveLiteral::Int(2))),
3163                Some(Literal::Primitive(PrimitiveLiteral::Int(3))),
3164                None,
3165            ]),
3166            &Type::List(ListType {
3167                element_field: NestedField::list_element(
3168                    0,
3169                    Type::Primitive(PrimitiveType::Int),
3170                    true,
3171                )
3172                .into(),
3173            }),
3174        );
3175    }
3176
3177    #[test]
3178    fn json_map() {
3179        let record = r#"{ "keys": ["a", "b", "c"], "values": [1, 2, null] }"#;
3180
3181        check_json_serde(
3182            record,
3183            Literal::Map(Map::from([
3184                (
3185                    Literal::Primitive(PrimitiveLiteral::String("a".to_string())),
3186                    Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
3187                ),
3188                (
3189                    Literal::Primitive(PrimitiveLiteral::String("b".to_string())),
3190                    Some(Literal::Primitive(PrimitiveLiteral::Int(2))),
3191                ),
3192                (
3193                    Literal::Primitive(PrimitiveLiteral::String("c".to_string())),
3194                    None,
3195                ),
3196            ])),
3197            &Type::Map(MapType {
3198                key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::String))
3199                    .into(),
3200                value_field: NestedField::map_value_element(
3201                    1,
3202                    Type::Primitive(PrimitiveType::Int),
3203                    true,
3204                )
3205                .into(),
3206            }),
3207        );
3208    }
3209
3210    #[test]
3211    fn avro_bytes_boolean() {
3212        let bytes = vec![1u8];
3213
3214        check_avro_bytes_serde(bytes, Datum::bool(true), &PrimitiveType::Boolean);
3215    }
3216
3217    #[test]
3218    fn avro_bytes_int() {
3219        let bytes = vec![32u8, 0u8, 0u8, 0u8];
3220
3221        check_avro_bytes_serde(bytes, Datum::int(32), &PrimitiveType::Int);
3222    }
3223
3224    #[test]
3225    fn avro_bytes_long() {
3226        let bytes = vec![32u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8];
3227
3228        check_avro_bytes_serde(bytes, Datum::long(32), &PrimitiveType::Long);
3229    }
3230
3231    #[test]
3232    fn avro_bytes_long_from_int() {
3233        let bytes = vec![32u8, 0u8, 0u8, 0u8];
3234
3235        check_avro_bytes_serde(bytes, Datum::long(32), &PrimitiveType::Long);
3236    }
3237
3238    #[test]
3239    fn avro_bytes_float() {
3240        let bytes = vec![0u8, 0u8, 128u8, 63u8];
3241
3242        check_avro_bytes_serde(bytes, Datum::float(1.0), &PrimitiveType::Float);
3243    }
3244
3245    #[test]
3246    fn avro_bytes_double() {
3247        let bytes = vec![0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 240u8, 63u8];
3248
3249        check_avro_bytes_serde(bytes, Datum::double(1.0), &PrimitiveType::Double);
3250    }
3251
3252    #[test]
3253    fn avro_bytes_double_from_float() {
3254        let bytes = vec![0u8, 0u8, 128u8, 63u8];
3255
3256        check_avro_bytes_serde(bytes, Datum::double(1.0), &PrimitiveType::Double);
3257    }
3258
3259    #[test]
3260    fn avro_bytes_string() {
3261        let bytes = vec![105u8, 99u8, 101u8, 98u8, 101u8, 114u8, 103u8];
3262
3263        check_avro_bytes_serde(bytes, Datum::string("iceberg"), &PrimitiveType::String);
3264    }
3265
3266    #[test]
3267    fn avro_bytes_decimal() {
3268        // (input_bytes, decimal_num, expect_scale, expect_precision)
3269        let cases = vec![
3270            (vec![4u8, 210u8], 1234, 2, 38),
3271            (vec![251u8, 46u8], -1234, 2, 38),
3272            (vec![4u8, 210u8], 1234, 3, 38),
3273            (vec![251u8, 46u8], -1234, 3, 38),
3274            (vec![42u8], 42, 2, 1),
3275            (vec![214u8], -42, 2, 1),
3276        ];
3277
3278        for (input_bytes, decimal_num, expect_scale, expect_precision) in cases {
3279            check_avro_bytes_serde(
3280                input_bytes,
3281                Datum::decimal_with_precision(
3282                    Decimal::new(decimal_num, expect_scale),
3283                    expect_precision,
3284                )
3285                .unwrap(),
3286                &PrimitiveType::Decimal {
3287                    precision: expect_precision,
3288                    scale: expect_scale,
3289                },
3290            );
3291        }
3292    }
3293
3294    #[test]
3295    fn avro_bytes_decimal_expect_error() {
3296        // (decimal_num, expect_scale, expect_precision)
3297        let cases = vec![(1234, 2, 1)];
3298
3299        for (decimal_num, expect_scale, expect_precision) in cases {
3300            let result = Datum::decimal_with_precision(
3301                Decimal::new(decimal_num, expect_scale),
3302                expect_precision,
3303            );
3304            assert!(result.is_err(), "expect error but got {:?}", result);
3305            assert_eq!(
3306                result.unwrap_err().kind(),
3307                ErrorKind::DataInvalid,
3308                "expect error DataInvalid",
3309            );
3310        }
3311    }
3312
3313    #[test]
3314    fn avro_convert_test_int() {
3315        check_convert_with_avro(
3316            Literal::Primitive(PrimitiveLiteral::Int(32)),
3317            &Type::Primitive(PrimitiveType::Int),
3318        );
3319    }
3320
3321    #[test]
3322    fn avro_convert_test_long() {
3323        check_convert_with_avro(
3324            Literal::Primitive(PrimitiveLiteral::Long(32)),
3325            &Type::Primitive(PrimitiveType::Long),
3326        );
3327    }
3328
3329    #[test]
3330    fn avro_convert_test_float() {
3331        check_convert_with_avro(
3332            Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(1.0))),
3333            &Type::Primitive(PrimitiveType::Float),
3334        );
3335    }
3336
3337    #[test]
3338    fn avro_convert_test_double() {
3339        check_convert_with_avro(
3340            Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(1.0))),
3341            &Type::Primitive(PrimitiveType::Double),
3342        );
3343    }
3344
3345    #[test]
3346    fn avro_convert_test_string() {
3347        check_convert_with_avro(
3348            Literal::Primitive(PrimitiveLiteral::String("iceberg".to_string())),
3349            &Type::Primitive(PrimitiveType::String),
3350        );
3351    }
3352
3353    #[test]
3354    fn avro_convert_test_date() {
3355        check_convert_with_avro(
3356            Literal::Primitive(PrimitiveLiteral::Int(17486)),
3357            &Type::Primitive(PrimitiveType::Date),
3358        );
3359    }
3360
3361    #[test]
3362    fn avro_convert_test_time() {
3363        check_convert_with_avro(
3364            Literal::Primitive(PrimitiveLiteral::Long(81068123456)),
3365            &Type::Primitive(PrimitiveType::Time),
3366        );
3367    }
3368
3369    #[test]
3370    fn avro_convert_test_timestamp() {
3371        check_convert_with_avro(
3372            Literal::Primitive(PrimitiveLiteral::Long(1510871468123456)),
3373            &Type::Primitive(PrimitiveType::Timestamp),
3374        );
3375    }
3376
3377    #[test]
3378    fn avro_convert_test_timestamptz() {
3379        check_convert_with_avro(
3380            Literal::Primitive(PrimitiveLiteral::Long(1510871468123456)),
3381            &Type::Primitive(PrimitiveType::Timestamptz),
3382        );
3383    }
3384
3385    #[test]
3386    fn avro_convert_test_list() {
3387        check_convert_with_avro(
3388            Literal::List(vec![
3389                Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
3390                Some(Literal::Primitive(PrimitiveLiteral::Int(2))),
3391                Some(Literal::Primitive(PrimitiveLiteral::Int(3))),
3392                None,
3393            ]),
3394            &Type::List(ListType {
3395                element_field: NestedField::list_element(
3396                    0,
3397                    Type::Primitive(PrimitiveType::Int),
3398                    false,
3399                )
3400                .into(),
3401            }),
3402        );
3403
3404        check_convert_with_avro(
3405            Literal::List(vec![
3406                Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
3407                Some(Literal::Primitive(PrimitiveLiteral::Int(2))),
3408                Some(Literal::Primitive(PrimitiveLiteral::Int(3))),
3409            ]),
3410            &Type::List(ListType {
3411                element_field: NestedField::list_element(
3412                    0,
3413                    Type::Primitive(PrimitiveType::Int),
3414                    true,
3415                )
3416                .into(),
3417            }),
3418        );
3419    }
3420
3421    fn check_convert_with_avro_map(expected_literal: Literal, expected_type: &Type) {
3422        let fields = vec![NestedField::required(1, "col", expected_type.clone()).into()];
3423        let schema = Schema::builder()
3424            .with_fields(fields.clone())
3425            .build()
3426            .unwrap();
3427        let avro_schema = schema_to_avro_schema("test", &schema).unwrap();
3428        let struct_type = Type::Struct(StructType::new(fields));
3429        let struct_literal =
3430            Literal::Struct(Struct::from_iter(vec![Some(expected_literal.clone())]));
3431
3432        let mut writer = apache_avro::Writer::new(&avro_schema, Vec::new());
3433        let raw_literal = RawLiteral::try_from(struct_literal.clone(), &struct_type).unwrap();
3434        writer.append_ser(raw_literal).unwrap();
3435        let encoded = writer.into_inner().unwrap();
3436
3437        let reader = apache_avro::Reader::new(&*encoded).unwrap();
3438        for record in reader {
3439            let result = apache_avro::from_value::<RawLiteral>(&record.unwrap()).unwrap();
3440            let desered_literal = result.try_into(&struct_type).unwrap().unwrap();
3441            match (&desered_literal, &struct_literal) {
3442                (Literal::Struct(desered), Literal::Struct(expected)) => {
3443                    match (&desered.fields[0], &expected.fields[0]) {
3444                        (Some(Literal::Map(desered)), Some(Literal::Map(expected))) => {
3445                            assert!(desered.has_same_content(expected))
3446                        }
3447                        _ => {
3448                            unreachable!()
3449                        }
3450                    }
3451                }
3452                _ => {
3453                    panic!("unexpected literal type");
3454                }
3455            }
3456        }
3457    }
3458
3459    #[test]
3460    fn avro_convert_test_map() {
3461        check_convert_with_avro_map(
3462            Literal::Map(Map::from([
3463                (
3464                    Literal::Primitive(PrimitiveLiteral::Int(1)),
3465                    Some(Literal::Primitive(PrimitiveLiteral::Long(1))),
3466                ),
3467                (
3468                    Literal::Primitive(PrimitiveLiteral::Int(2)),
3469                    Some(Literal::Primitive(PrimitiveLiteral::Long(2))),
3470                ),
3471                (Literal::Primitive(PrimitiveLiteral::Int(3)), None),
3472            ])),
3473            &Type::Map(MapType {
3474                key_field: NestedField::map_key_element(2, Type::Primitive(PrimitiveType::Int))
3475                    .into(),
3476                value_field: NestedField::map_value_element(
3477                    3,
3478                    Type::Primitive(PrimitiveType::Long),
3479                    false,
3480                )
3481                .into(),
3482            }),
3483        );
3484
3485        check_convert_with_avro_map(
3486            Literal::Map(Map::from([
3487                (
3488                    Literal::Primitive(PrimitiveLiteral::Int(1)),
3489                    Some(Literal::Primitive(PrimitiveLiteral::Long(1))),
3490                ),
3491                (
3492                    Literal::Primitive(PrimitiveLiteral::Int(2)),
3493                    Some(Literal::Primitive(PrimitiveLiteral::Long(2))),
3494                ),
3495                (
3496                    Literal::Primitive(PrimitiveLiteral::Int(3)),
3497                    Some(Literal::Primitive(PrimitiveLiteral::Long(3))),
3498                ),
3499            ])),
3500            &Type::Map(MapType {
3501                key_field: NestedField::map_key_element(2, Type::Primitive(PrimitiveType::Int))
3502                    .into(),
3503                value_field: NestedField::map_value_element(
3504                    3,
3505                    Type::Primitive(PrimitiveType::Long),
3506                    true,
3507                )
3508                .into(),
3509            }),
3510        );
3511    }
3512
3513    #[test]
3514    fn avro_convert_test_string_map() {
3515        check_convert_with_avro_map(
3516            Literal::Map(Map::from([
3517                (
3518                    Literal::Primitive(PrimitiveLiteral::String("a".to_string())),
3519                    Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
3520                ),
3521                (
3522                    Literal::Primitive(PrimitiveLiteral::String("b".to_string())),
3523                    Some(Literal::Primitive(PrimitiveLiteral::Int(2))),
3524                ),
3525                (
3526                    Literal::Primitive(PrimitiveLiteral::String("c".to_string())),
3527                    None,
3528                ),
3529            ])),
3530            &Type::Map(MapType {
3531                key_field: NestedField::map_key_element(2, Type::Primitive(PrimitiveType::String))
3532                    .into(),
3533                value_field: NestedField::map_value_element(
3534                    3,
3535                    Type::Primitive(PrimitiveType::Int),
3536                    false,
3537                )
3538                .into(),
3539            }),
3540        );
3541
3542        check_convert_with_avro_map(
3543            Literal::Map(Map::from([
3544                (
3545                    Literal::Primitive(PrimitiveLiteral::String("a".to_string())),
3546                    Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
3547                ),
3548                (
3549                    Literal::Primitive(PrimitiveLiteral::String("b".to_string())),
3550                    Some(Literal::Primitive(PrimitiveLiteral::Int(2))),
3551                ),
3552                (
3553                    Literal::Primitive(PrimitiveLiteral::String("c".to_string())),
3554                    Some(Literal::Primitive(PrimitiveLiteral::Int(3))),
3555                ),
3556            ])),
3557            &Type::Map(MapType {
3558                key_field: NestedField::map_key_element(2, Type::Primitive(PrimitiveType::String))
3559                    .into(),
3560                value_field: NestedField::map_value_element(
3561                    3,
3562                    Type::Primitive(PrimitiveType::Int),
3563                    true,
3564                )
3565                .into(),
3566            }),
3567        );
3568    }
3569
3570    #[test]
3571    fn avro_convert_test_record() {
3572        check_convert_with_avro(
3573            Literal::Struct(Struct::from_iter(vec![
3574                Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
3575                Some(Literal::Primitive(PrimitiveLiteral::String(
3576                    "bar".to_string(),
3577                ))),
3578                None,
3579            ])),
3580            &Type::Struct(StructType::new(vec![
3581                NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
3582                NestedField::optional(3, "name", Type::Primitive(PrimitiveType::String)).into(),
3583                NestedField::optional(4, "address", Type::Primitive(PrimitiveType::String)).into(),
3584            ])),
3585        );
3586    }
3587
3588    // # TODO:https://github.com/apache/iceberg-rust/issues/86
3589    // rust avro don't support deserialize any bytes representation now:
3590    // - binary
3591    // - decimal
3592    #[test]
3593    fn avro_convert_test_binary_ser() {
3594        let literal = Literal::Primitive(PrimitiveLiteral::Binary(vec![1, 2, 3, 4, 5]));
3595        let ty = Type::Primitive(PrimitiveType::Binary);
3596        let expect_value = Value::Bytes(vec![1, 2, 3, 4, 5]);
3597        check_serialize_avro(literal, &ty, expect_value);
3598    }
3599
3600    #[test]
3601    fn avro_convert_test_decimal_ser() {
3602        let literal = Literal::decimal(12345);
3603        let ty = Type::Primitive(PrimitiveType::Decimal {
3604            precision: 9,
3605            scale: 8,
3606        });
3607        let expect_value = Value::Decimal(apache_avro::Decimal::from(12345_i128.to_be_bytes()));
3608        check_serialize_avro(literal, &ty, expect_value);
3609    }
3610
3611    // # TODO:https://github.com/apache/iceberg-rust/issues/86
3612    // rust avro can't support to convert any byte-like type to fixed in avro now.
3613    // - uuid ser/de
3614    // - fixed ser/de
3615
3616    #[test]
3617    fn test_parse_timestamp() {
3618        let value = Datum::timestamp_from_str("2021-08-01T01:09:00.0899").unwrap();
3619        assert_eq!(&format!("{value}"), "2021-08-01 01:09:00.089900");
3620
3621        let value = Datum::timestamp_from_str("2023-01-06T00:00:00").unwrap();
3622        assert_eq!(&format!("{value}"), "2023-01-06 00:00:00");
3623
3624        let value = Datum::timestamp_from_str("2021-08-01T01:09:00.0899+0800");
3625        assert!(value.is_err(), "Parse timestamp with timezone should fail!");
3626
3627        let value = Datum::timestamp_from_str("dfa");
3628        assert!(
3629            value.is_err(),
3630            "Parse timestamp with invalid input should fail!"
3631        );
3632    }
3633
3634    #[test]
3635    fn test_parse_timestamptz() {
3636        let value = Datum::timestamptz_from_str("2021-08-01T09:09:00.0899+0800").unwrap();
3637        assert_eq!(&format!("{value}"), "2021-08-01 01:09:00.089900 UTC");
3638
3639        let value = Datum::timestamptz_from_str("2021-08-01T01:09:00.0899");
3640        assert!(
3641            value.is_err(),
3642            "Parse timestamptz without timezone should fail!"
3643        );
3644
3645        let value = Datum::timestamptz_from_str("dfa");
3646        assert!(
3647            value.is_err(),
3648            "Parse timestamptz with invalid input should fail!"
3649        );
3650    }
3651
3652    #[test]
3653    fn test_datum_ser_deser() {
3654        let test_fn = |datum: Datum| {
3655            let json = serde_json::to_value(&datum).unwrap();
3656            let desered_datum: Datum = serde_json::from_value(json).unwrap();
3657            assert_eq!(datum, desered_datum);
3658        };
3659        let datum = Datum::int(1);
3660        test_fn(datum);
3661        let datum = Datum::long(1);
3662        test_fn(datum);
3663
3664        let datum = Datum::float(1.0);
3665        test_fn(datum);
3666        let datum = Datum::float(0_f32);
3667        test_fn(datum);
3668        let datum = Datum::float(-0_f32);
3669        test_fn(datum);
3670        let datum = Datum::float(f32::MAX);
3671        test_fn(datum);
3672        let datum = Datum::float(f32::MIN);
3673        test_fn(datum);
3674
3675        // serde_json can't serialize f32::INFINITY, f32::NEG_INFINITY, f32::NAN
3676        let datum = Datum::float(f32::INFINITY);
3677        let json = serde_json::to_string(&datum).unwrap();
3678        assert!(serde_json::from_str::<Datum>(&json).is_err());
3679        let datum = Datum::float(f32::NEG_INFINITY);
3680        let json = serde_json::to_string(&datum).unwrap();
3681        assert!(serde_json::from_str::<Datum>(&json).is_err());
3682        let datum = Datum::float(f32::NAN);
3683        let json = serde_json::to_string(&datum).unwrap();
3684        assert!(serde_json::from_str::<Datum>(&json).is_err());
3685
3686        let datum = Datum::double(1.0);
3687        test_fn(datum);
3688        let datum = Datum::double(f64::MAX);
3689        test_fn(datum);
3690        let datum = Datum::double(f64::MIN);
3691        test_fn(datum);
3692
3693        // serde_json can't serialize f32::INFINITY, f32::NEG_INFINITY, f32::NAN
3694        let datum = Datum::double(f64::INFINITY);
3695        let json = serde_json::to_string(&datum).unwrap();
3696        assert!(serde_json::from_str::<Datum>(&json).is_err());
3697        let datum = Datum::double(f64::NEG_INFINITY);
3698        let json = serde_json::to_string(&datum).unwrap();
3699        assert!(serde_json::from_str::<Datum>(&json).is_err());
3700        let datum = Datum::double(f64::NAN);
3701        let json = serde_json::to_string(&datum).unwrap();
3702        assert!(serde_json::from_str::<Datum>(&json).is_err());
3703
3704        let datum = Datum::string("iceberg");
3705        test_fn(datum);
3706        let datum = Datum::bool(true);
3707        test_fn(datum);
3708        let datum = Datum::date(17486);
3709        test_fn(datum);
3710        let datum = Datum::time_from_hms_micro(22, 15, 33, 111).unwrap();
3711        test_fn(datum);
3712        let datum = Datum::timestamp_micros(1510871468123456);
3713        test_fn(datum);
3714        let datum = Datum::timestamptz_micros(1510871468123456);
3715        test_fn(datum);
3716        let datum = Datum::uuid(Uuid::parse_str("f79c3e09-677c-4bbd-a479-3f349cb785e7").unwrap());
3717        test_fn(datum);
3718        let datum = Datum::decimal(1420).unwrap();
3719        test_fn(datum);
3720        let datum = Datum::binary(vec![1, 2, 3, 4, 5]);
3721        test_fn(datum);
3722        let datum = Datum::fixed(vec![1, 2, 3, 4, 5]);
3723        test_fn(datum);
3724    }
3725
3726    #[test]
3727    fn test_datum_date_convert_to_int() {
3728        let datum_date = Datum::date(12345);
3729
3730        let result = datum_date.to(&Primitive(PrimitiveType::Int)).unwrap();
3731
3732        let expected = Datum::int(12345);
3733
3734        assert_eq!(result, expected);
3735    }
3736
3737    #[test]
3738    fn test_datum_int_convert_to_date() {
3739        let datum_int = Datum::int(12345);
3740
3741        let result = datum_int.to(&Primitive(PrimitiveType::Date)).unwrap();
3742
3743        let expected = Datum::date(12345);
3744
3745        assert_eq!(result, expected);
3746    }
3747
3748    #[test]
3749    fn test_datum_long_convert_to_int() {
3750        let datum = Datum::long(12345);
3751
3752        let result = datum.to(&Primitive(PrimitiveType::Int)).unwrap();
3753
3754        let expected = Datum::int(12345);
3755
3756        assert_eq!(result, expected);
3757    }
3758
3759    #[test]
3760    fn test_datum_long_convert_to_int_above_max() {
3761        let datum = Datum::long(INT_MAX as i64 + 1);
3762
3763        let result = datum.to(&Primitive(PrimitiveType::Int)).unwrap();
3764
3765        let expected = Datum::new(PrimitiveType::Int, PrimitiveLiteral::AboveMax);
3766
3767        assert_eq!(result, expected);
3768    }
3769
3770    #[test]
3771    fn test_datum_long_convert_to_int_below_min() {
3772        let datum = Datum::long(INT_MIN as i64 - 1);
3773
3774        let result = datum.to(&Primitive(PrimitiveType::Int)).unwrap();
3775
3776        let expected = Datum::new(PrimitiveType::Int, PrimitiveLiteral::BelowMin);
3777
3778        assert_eq!(result, expected);
3779    }
3780
3781    #[test]
3782    fn test_datum_long_convert_to_timestamp() {
3783        let datum = Datum::long(12345);
3784
3785        let result = datum.to(&Primitive(PrimitiveType::Timestamp)).unwrap();
3786
3787        let expected = Datum::timestamp_micros(12345);
3788
3789        assert_eq!(result, expected);
3790    }
3791
3792    #[test]
3793    fn test_datum_long_convert_to_timestamptz() {
3794        let datum = Datum::long(12345);
3795
3796        let result = datum.to(&Primitive(PrimitiveType::Timestamptz)).unwrap();
3797
3798        let expected = Datum::timestamptz_micros(12345);
3799
3800        assert_eq!(result, expected);
3801    }
3802
3803    #[test]
3804    fn test_datum_decimal_convert_to_long() {
3805        let datum = Datum::decimal(12345).unwrap();
3806
3807        let result = datum.to(&Primitive(PrimitiveType::Long)).unwrap();
3808
3809        let expected = Datum::long(12345);
3810
3811        assert_eq!(result, expected);
3812    }
3813
3814    #[test]
3815    fn test_datum_decimal_convert_to_long_above_max() {
3816        let datum = Datum::decimal(LONG_MAX as i128 + 1).unwrap();
3817
3818        let result = datum.to(&Primitive(PrimitiveType::Long)).unwrap();
3819
3820        let expected = Datum::new(PrimitiveType::Long, PrimitiveLiteral::AboveMax);
3821
3822        assert_eq!(result, expected);
3823    }
3824
3825    #[test]
3826    fn test_datum_decimal_convert_to_long_below_min() {
3827        let datum = Datum::decimal(LONG_MIN as i128 - 1).unwrap();
3828
3829        let result = datum.to(&Primitive(PrimitiveType::Long)).unwrap();
3830
3831        let expected = Datum::new(PrimitiveType::Long, PrimitiveLiteral::BelowMin);
3832
3833        assert_eq!(result, expected);
3834    }
3835
3836    #[test]
3837    fn test_datum_string_convert_to_boolean() {
3838        let datum = Datum::string("true");
3839
3840        let result = datum.to(&Primitive(PrimitiveType::Boolean)).unwrap();
3841
3842        let expected = Datum::bool(true);
3843
3844        assert_eq!(result, expected);
3845    }
3846
3847    #[test]
3848    fn test_datum_string_convert_to_int() {
3849        let datum = Datum::string("12345");
3850
3851        let result = datum.to(&Primitive(PrimitiveType::Int)).unwrap();
3852
3853        let expected = Datum::int(12345);
3854
3855        assert_eq!(result, expected);
3856    }
3857
3858    #[test]
3859    fn test_datum_string_convert_to_long() {
3860        let datum = Datum::string("12345");
3861
3862        let result = datum.to(&Primitive(PrimitiveType::Long)).unwrap();
3863
3864        let expected = Datum::long(12345);
3865
3866        assert_eq!(result, expected);
3867    }
3868
3869    #[test]
3870    fn test_datum_string_convert_to_timestamp() {
3871        let datum = Datum::string("1925-05-20T19:25:00.000");
3872
3873        let result = datum.to(&Primitive(PrimitiveType::Timestamp)).unwrap();
3874
3875        let expected = Datum::timestamp_micros(-1407990900000000);
3876
3877        assert_eq!(result, expected);
3878    }
3879
3880    #[test]
3881    fn test_datum_string_convert_to_timestamptz() {
3882        let datum = Datum::string("1925-05-20T19:25:00.000 UTC");
3883
3884        let result = datum.to(&Primitive(PrimitiveType::Timestamptz)).unwrap();
3885
3886        let expected = Datum::timestamptz_micros(-1407990900000000);
3887
3888        assert_eq!(result, expected);
3889    }
3890
3891    #[test]
3892    fn test_iceberg_float_order() {
3893        // Test float ordering
3894        let float_values = vec![
3895            Datum::float(f32::NAN),
3896            Datum::float(-f32::NAN),
3897            Datum::float(f32::MAX),
3898            Datum::float(f32::MIN),
3899            Datum::float(f32::INFINITY),
3900            Datum::float(-f32::INFINITY),
3901            Datum::float(1.0),
3902            Datum::float(-1.0),
3903            Datum::float(0.0),
3904            Datum::float(-0.0),
3905        ];
3906
3907        let mut float_sorted = float_values.clone();
3908        float_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
3909
3910        let float_expected = vec![
3911            Datum::float(-f32::NAN),
3912            Datum::float(-f32::INFINITY),
3913            Datum::float(f32::MIN),
3914            Datum::float(-1.0),
3915            Datum::float(-0.0),
3916            Datum::float(0.0),
3917            Datum::float(1.0),
3918            Datum::float(f32::MAX),
3919            Datum::float(f32::INFINITY),
3920            Datum::float(f32::NAN),
3921        ];
3922
3923        assert_eq!(float_sorted, float_expected);
3924
3925        // Test double ordering
3926        let double_values = vec![
3927            Datum::double(f64::NAN),
3928            Datum::double(-f64::NAN),
3929            Datum::double(f64::INFINITY),
3930            Datum::double(-f64::INFINITY),
3931            Datum::double(f64::MAX),
3932            Datum::double(f64::MIN),
3933            Datum::double(1.0),
3934            Datum::double(-1.0),
3935            Datum::double(0.0),
3936            Datum::double(-0.0),
3937        ];
3938
3939        let mut double_sorted = double_values.clone();
3940        double_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
3941
3942        let double_expected = vec![
3943            Datum::double(-f64::NAN),
3944            Datum::double(-f64::INFINITY),
3945            Datum::double(f64::MIN),
3946            Datum::double(-1.0),
3947            Datum::double(-0.0),
3948            Datum::double(0.0),
3949            Datum::double(1.0),
3950            Datum::double(f64::MAX),
3951            Datum::double(f64::INFINITY),
3952            Datum::double(f64::NAN),
3953        ];
3954
3955        assert_eq!(double_sorted, double_expected);
3956    }
3957}