1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
use crate::error::{Error, Result};

use super::{IntegerType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType, TimeUnit};

fn check_decimal_invariants(
    physical_type: &PhysicalType,
    precision: usize,
    scale: usize,
) -> Result<()> {
    if precision < 1 {
        return Err(Error::oos(format!(
            "DECIMAL precision must be larger than 0; It is {}",
            precision,
        )));
    }
    if scale >= precision {
        return Err(Error::oos(format!(
            "Invalid DECIMAL: scale ({}) cannot be greater than or equal to precision \
            ({})",
            scale, precision
        )));
    }

    match physical_type {
        PhysicalType::Int32 => {
            if !(1..=9).contains(&precision) {
                return Err(Error::oos(format!(
                    "Cannot represent INT32 as DECIMAL with precision {}",
                    precision
                )));
            }
        }
        PhysicalType::Int64 => {
            if !(1..=18).contains(&precision) {
                return Err(Error::oos(format!(
                    "Cannot represent INT64 as DECIMAL with precision {}",
                    precision
                )));
            }
        }
        PhysicalType::FixedLenByteArray(length) => {
            let oos_error = || Error::oos(format!("Byte Array length {} out of spec", length));
            let max_precision = (2f64.powi(
                (*length as i32)
                    .checked_mul(8)
                    .ok_or_else(oos_error)?
                    .checked_sub(1)
                    .ok_or_else(oos_error)?,
            ) - 1f64)
                .log10()
                .floor() as usize;

            if precision > max_precision {
                return Err(Error::oos(format!(
                    "Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length {} and \
                    precision {}. The max precision can only be {}",
                    length, precision, max_precision
                )));
            }
        }
        PhysicalType::ByteArray => {}
        _ => {
            return Err(Error::oos(
                "DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"
                    .to_string(),
            ))
        }
    };
    Ok(())
}

pub fn check_converted_invariants(
    physical_type: &PhysicalType,
    converted_type: &Option<PrimitiveConvertedType>,
) -> Result<()> {
    if converted_type.is_none() {
        return Ok(());
    };
    let converted_type = converted_type.as_ref().unwrap();

    use PrimitiveConvertedType::*;
    match converted_type {
        Utf8 | Bson | Json => {
            if physical_type != &PhysicalType::ByteArray {
                return Err(Error::oos(format!(
                    "{:?} can only annotate BYTE_ARRAY fields",
                    converted_type
                )));
            }
        }
        Decimal(precision, scale) => {
            check_decimal_invariants(physical_type, *precision, *scale)?;
        }
        Date | TimeMillis | Uint8 | Uint16 | Uint32 | Int8 | Int16 | Int32 => {
            if physical_type != &PhysicalType::Int32 {
                return Err(Error::oos(format!(
                    "{:?} can only annotate INT32",
                    converted_type
                )));
            }
        }
        TimeMicros | TimestampMillis | TimestampMicros | Uint64 | Int64 => {
            if physical_type != &PhysicalType::Int64 {
                return Err(Error::oos(format!(
                    "{:?} can only annotate INT64",
                    converted_type
                )));
            }
        }
        Interval => {
            if physical_type != &PhysicalType::FixedLenByteArray(12) {
                return Err(Error::oos(
                    "INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)".to_string(),
                ));
            }
        }
        Enum => {
            if physical_type != &PhysicalType::ByteArray {
                return Err(Error::oos(
                    "ENUM can only annotate BYTE_ARRAY fields".to_string(),
                ));
            }
        }
    };
    Ok(())
}

pub fn check_logical_invariants(
    physical_type: &PhysicalType,
    logical_type: &Option<PrimitiveLogicalType>,
) -> Result<()> {
    if logical_type.is_none() {
        return Ok(());
    };
    let logical_type = logical_type.unwrap();

    // Check that logical type and physical type are compatible
    use PrimitiveLogicalType::*;
    match (logical_type, physical_type) {
        (Enum, PhysicalType::ByteArray) => {}
        (Decimal(precision, scale), _) => {
            check_decimal_invariants(physical_type, precision, scale)?;
        }
        (Date, PhysicalType::Int32) => {}
        (
            Time {
                unit: TimeUnit::Milliseconds,
                ..
            },
            PhysicalType::Int32,
        ) => {}
        (Time { unit, .. }, PhysicalType::Int64) => {
            if unit == TimeUnit::Milliseconds {
                return Err(Error::oos(
                    "Cannot use millisecond unit on INT64 type".to_string(),
                ));
            }
        }
        (Timestamp { .. }, PhysicalType::Int64) => {}
        (Integer(IntegerType::Int8), PhysicalType::Int32) => {}
        (Integer(IntegerType::Int16), PhysicalType::Int32) => {}
        (Integer(IntegerType::Int32), PhysicalType::Int32) => {}
        (Integer(IntegerType::UInt8), PhysicalType::Int32) => {}
        (Integer(IntegerType::UInt16), PhysicalType::Int32) => {}
        (Integer(IntegerType::UInt32), PhysicalType::Int32) => {}
        (Integer(IntegerType::UInt64), PhysicalType::Int64) => {}
        (Integer(IntegerType::Int64), PhysicalType::Int64) => {}
        // Null type
        (Unknown, PhysicalType::Int32) => {}
        (String | Json | Bson, PhysicalType::ByteArray) => {}
        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid
        (Uuid, PhysicalType::FixedLenByteArray(16)) => {}
        (a, b) => {
            return Err(Error::oos(format!(
                "Cannot annotate {:?} from {:?} fields",
                a, b
            )))
        }
    };
    Ok(())
}