parquet/arrow/schema/
primitive.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType};
19use crate::errors::{ParquetError, Result};
20use crate::schema::types::{BasicTypeInfo, Type};
21use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION};
22
23/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
24/// provided by the arrow schema
25///
26/// Note: the values embedded in the schema are advisory,
27pub fn convert_primitive(
28    parquet_type: &Type,
29    arrow_type_hint: Option<DataType>,
30) -> Result<DataType> {
31    let physical_type = from_parquet(parquet_type)?;
32    Ok(match arrow_type_hint {
33        Some(hint) => apply_hint(physical_type, hint),
34        None => physical_type,
35    })
36}
37
38/// Uses an type hint from the embedded arrow schema to aid in faithfully
39/// reproducing the data as it was written into parquet
40fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
41    match (&parquet, &hint) {
42        // Not all time units can be represented as LogicalType / ConvertedType
43        (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
44        (DataType::Int32, DataType::Time32(_)) => hint,
45        (DataType::Int64, DataType::Time64(_)) => hint,
46
47        // Date64 doesn't have a corresponding LogicalType / ConvertedType
48        (DataType::Int64, DataType::Date64) => hint,
49
50        // Coerce Date32 back to Date64 (#1666)
51        (DataType::Date32, DataType::Date64) => hint,
52
53        // Determine timezone
54        (DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
55
56        // Determine offset size
57        (DataType::Utf8, DataType::LargeUtf8) => hint,
58        (DataType::Binary, DataType::LargeBinary) => hint,
59
60        // Read as Utf8
61        (DataType::Binary, DataType::Utf8) => hint,
62        (DataType::Binary, DataType::LargeUtf8) => hint,
63        (DataType::Binary, DataType::Utf8View) => hint,
64
65        // Determine view type
66        (DataType::Utf8, DataType::Utf8View) => hint,
67        (DataType::Binary, DataType::BinaryView) => hint,
68
69        // Determine interval time unit (#1666)
70        (DataType::Interval(_), DataType::Interval(_)) => hint,
71
72        // Promote to Decimal256
73        (DataType::Decimal128(_, _), DataType::Decimal256(_, _)) => hint,
74
75        // Potentially preserve dictionary encoding
76        (_, DataType::Dictionary(_, value)) => {
77            // Apply hint to inner type
78            let hinted = apply_hint(parquet, value.as_ref().clone());
79
80            // If matches dictionary value - preserve dictionary
81            // otherwise use hinted inner type
82            match &hinted == value.as_ref() {
83                true => hint,
84                false => hinted,
85            }
86        }
87        _ => parquet,
88    }
89}
90
91fn from_parquet(parquet_type: &Type) -> Result<DataType> {
92    match parquet_type {
93        Type::PrimitiveType {
94            physical_type,
95            basic_info,
96            type_length,
97            scale,
98            precision,
99            ..
100        } => match physical_type {
101            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
102            PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
103            PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
104            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
105            PhysicalType::FLOAT => Ok(DataType::Float32),
106            PhysicalType::DOUBLE => Ok(DataType::Float64),
107            PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
108            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
109                from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
110            }
111        },
112        Type::GroupType { .. } => unreachable!(),
113    }
114}
115
116fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
117    if precision <= DECIMAL128_MAX_PRECISION as _ {
118        decimal_128_type(scale, precision)
119    } else {
120        decimal_256_type(scale, precision)
121    }
122}
123
124fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
125    let scale = scale
126        .try_into()
127        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
128
129    let precision = precision
130        .try_into()
131        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
132
133    Ok(DataType::Decimal128(precision, scale))
134}
135
136fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
137    let scale = scale
138        .try_into()
139        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
140
141    let precision = precision
142        .try_into()
143        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
144
145    Ok(DataType::Decimal256(precision, scale))
146}
147
148fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
149    match (info.logical_type(), info.converted_type()) {
150        (None, ConvertedType::NONE) => Ok(DataType::Int32),
151        (
152            Some(
153                ref t @ LogicalType::Integer {
154                    bit_width,
155                    is_signed,
156                },
157            ),
158            _,
159        ) => match (bit_width, is_signed) {
160            (8, true) => Ok(DataType::Int8),
161            (16, true) => Ok(DataType::Int16),
162            (32, true) => Ok(DataType::Int32),
163            (8, false) => Ok(DataType::UInt8),
164            (16, false) => Ok(DataType::UInt16),
165            (32, false) => Ok(DataType::UInt32),
166            _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
167        },
168        (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
169        (Some(LogicalType::Date), _) => Ok(DataType::Date32),
170        (Some(LogicalType::Time { unit, .. }), _) => match unit {
171            ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
172            _ => Err(arrow_err!(
173                "Cannot create INT32 physical type from {:?}",
174                unit
175            )),
176        },
177        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
178        (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
179        (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
180        (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
181        (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
182        (None, ConvertedType::INT_8) => Ok(DataType::Int8),
183        (None, ConvertedType::INT_16) => Ok(DataType::Int16),
184        (None, ConvertedType::INT_32) => Ok(DataType::Int32),
185        (None, ConvertedType::DATE) => Ok(DataType::Date32),
186        (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
187        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
188        (logical, converted) => Err(arrow_err!(
189            "Unable to convert parquet INT32 logical type {:?} or converted type {}",
190            logical,
191            converted
192        )),
193    }
194}
195
196fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
197    match (info.logical_type(), info.converted_type()) {
198        (None, ConvertedType::NONE) => Ok(DataType::Int64),
199        (
200            Some(LogicalType::Integer {
201                bit_width: 64,
202                is_signed,
203            }),
204            _,
205        ) => match is_signed {
206            true => Ok(DataType::Int64),
207            false => Ok(DataType::UInt64),
208        },
209        (Some(LogicalType::Time { unit, .. }), _) => match unit {
210            ParquetTimeUnit::MILLIS(_) => {
211                Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
212            }
213            ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
214            ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
215        },
216        (
217            Some(LogicalType::Timestamp {
218                is_adjusted_to_u_t_c,
219                unit,
220            }),
221            _,
222        ) => Ok(DataType::Timestamp(
223            match unit {
224                ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
225                ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
226                ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
227            },
228            if is_adjusted_to_u_t_c {
229                Some("UTC".into())
230            } else {
231                None
232            },
233        )),
234        (None, ConvertedType::INT_64) => Ok(DataType::Int64),
235        (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
236        (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
237        (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
238            TimeUnit::Millisecond,
239            Some("UTC".into()),
240        )),
241        (None, ConvertedType::TIMESTAMP_MICROS) => Ok(DataType::Timestamp(
242            TimeUnit::Microsecond,
243            Some("UTC".into()),
244        )),
245        (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
246        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
247        (logical, converted) => Err(arrow_err!(
248            "Unable to convert parquet INT64 logical type {:?} or converted type {}",
249            logical,
250            converted
251        )),
252    }
253}
254
255fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32) -> Result<DataType> {
256    match (info.logical_type(), info.converted_type()) {
257        (Some(LogicalType::String), _) => Ok(DataType::Utf8),
258        (Some(LogicalType::Json), _) => Ok(DataType::Utf8),
259        (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
260        (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
261        (None, ConvertedType::NONE) => Ok(DataType::Binary),
262        (None, ConvertedType::JSON) => Ok(DataType::Utf8),
263        (None, ConvertedType::BSON) => Ok(DataType::Binary),
264        (None, ConvertedType::ENUM) => Ok(DataType::Binary),
265        (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
266        (
267            Some(LogicalType::Decimal {
268                scale: s,
269                precision: p,
270            }),
271            _,
272        ) => decimal_type(s, p),
273        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
274        (logical, converted) => Err(arrow_err!(
275            "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
276            logical,
277            converted
278        )),
279    }
280}
281
282fn from_fixed_len_byte_array(
283    info: &BasicTypeInfo,
284    scale: i32,
285    precision: i32,
286    type_length: i32,
287) -> Result<DataType> {
288    // TODO: This should check the type length for the decimal and interval types
289    match (info.logical_type(), info.converted_type()) {
290        (Some(LogicalType::Decimal { scale, precision }), _) => {
291            if type_length <= 16 {
292                decimal_128_type(scale, precision)
293            } else {
294                decimal_256_type(scale, precision)
295            }
296        }
297        (None, ConvertedType::DECIMAL) => {
298            if type_length <= 16 {
299                decimal_128_type(scale, precision)
300            } else {
301                decimal_256_type(scale, precision)
302            }
303        }
304        (None, ConvertedType::INTERVAL) => {
305            // There is currently no reliable way of determining which IntervalUnit
306            // to return. Thus without the original Arrow schema, the results
307            // would be incorrect if all 12 bytes of the interval are populated
308            Ok(DataType::Interval(IntervalUnit::DayTime))
309        }
310        (Some(LogicalType::Float16), _) => {
311            if type_length == 2 {
312                Ok(DataType::Float16)
313            } else {
314                Err(ParquetError::General(
315                    "FLOAT16 logical type must be Fixed Length Byte Array with length 2"
316                        .to_string(),
317                ))
318            }
319        }
320        _ => Ok(DataType::FixedSizeBinary(type_length)),
321    }
322}