1use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType};
19use crate::errors::{ParquetError, Result};
20use crate::schema::types::{BasicTypeInfo, Type};
21use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION};
22
23pub fn convert_primitive(
28 parquet_type: &Type,
29 arrow_type_hint: Option<DataType>,
30) -> Result<DataType> {
31 let physical_type = from_parquet(parquet_type)?;
32 Ok(match arrow_type_hint {
33 Some(hint) => apply_hint(physical_type, hint),
34 None => physical_type,
35 })
36}
37
38fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
41 match (&parquet, &hint) {
42 (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
44 (DataType::Int32, DataType::Time32(_)) => hint,
45 (DataType::Int64, DataType::Time64(_)) => hint,
46
47 (DataType::Int64, DataType::Date64) => hint,
49
50 (DataType::Date32, DataType::Date64) => hint,
52
53 (DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
55
56 (DataType::Utf8, DataType::LargeUtf8) => hint,
58 (DataType::Binary, DataType::LargeBinary) => hint,
59
60 (DataType::Binary, DataType::Utf8) => hint,
62 (DataType::Binary, DataType::LargeUtf8) => hint,
63 (DataType::Binary, DataType::Utf8View) => hint,
64
65 (DataType::Utf8, DataType::Utf8View) => hint,
67 (DataType::Binary, DataType::BinaryView) => hint,
68
69 (DataType::Interval(_), DataType::Interval(_)) => hint,
71
72 (DataType::Decimal128(_, _), DataType::Decimal256(_, _)) => hint,
74
75 (_, DataType::Dictionary(_, value)) => {
77 let hinted = apply_hint(parquet, value.as_ref().clone());
79
80 match &hinted == value.as_ref() {
83 true => hint,
84 false => hinted,
85 }
86 }
87 _ => parquet,
88 }
89}
90
91fn from_parquet(parquet_type: &Type) -> Result<DataType> {
92 match parquet_type {
93 Type::PrimitiveType {
94 physical_type,
95 basic_info,
96 type_length,
97 scale,
98 precision,
99 ..
100 } => match physical_type {
101 PhysicalType::BOOLEAN => Ok(DataType::Boolean),
102 PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
103 PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
104 PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
105 PhysicalType::FLOAT => Ok(DataType::Float32),
106 PhysicalType::DOUBLE => Ok(DataType::Float64),
107 PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
108 PhysicalType::FIXED_LEN_BYTE_ARRAY => {
109 from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
110 }
111 },
112 Type::GroupType { .. } => unreachable!(),
113 }
114}
115
116fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
117 if precision <= DECIMAL128_MAX_PRECISION as _ {
118 decimal_128_type(scale, precision)
119 } else {
120 decimal_256_type(scale, precision)
121 }
122}
123
124fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
125 let scale = scale
126 .try_into()
127 .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
128
129 let precision = precision
130 .try_into()
131 .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
132
133 Ok(DataType::Decimal128(precision, scale))
134}
135
136fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
137 let scale = scale
138 .try_into()
139 .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
140
141 let precision = precision
142 .try_into()
143 .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
144
145 Ok(DataType::Decimal256(precision, scale))
146}
147
148fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
149 match (info.logical_type(), info.converted_type()) {
150 (None, ConvertedType::NONE) => Ok(DataType::Int32),
151 (
152 Some(
153 ref t @ LogicalType::Integer {
154 bit_width,
155 is_signed,
156 },
157 ),
158 _,
159 ) => match (bit_width, is_signed) {
160 (8, true) => Ok(DataType::Int8),
161 (16, true) => Ok(DataType::Int16),
162 (32, true) => Ok(DataType::Int32),
163 (8, false) => Ok(DataType::UInt8),
164 (16, false) => Ok(DataType::UInt16),
165 (32, false) => Ok(DataType::UInt32),
166 _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
167 },
168 (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
169 (Some(LogicalType::Date), _) => Ok(DataType::Date32),
170 (Some(LogicalType::Time { unit, .. }), _) => match unit {
171 ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
172 _ => Err(arrow_err!(
173 "Cannot create INT32 physical type from {:?}",
174 unit
175 )),
176 },
177 (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
179 (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
180 (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
181 (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
182 (None, ConvertedType::INT_8) => Ok(DataType::Int8),
183 (None, ConvertedType::INT_16) => Ok(DataType::Int16),
184 (None, ConvertedType::INT_32) => Ok(DataType::Int32),
185 (None, ConvertedType::DATE) => Ok(DataType::Date32),
186 (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
187 (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
188 (logical, converted) => Err(arrow_err!(
189 "Unable to convert parquet INT32 logical type {:?} or converted type {}",
190 logical,
191 converted
192 )),
193 }
194}
195
196fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
197 match (info.logical_type(), info.converted_type()) {
198 (None, ConvertedType::NONE) => Ok(DataType::Int64),
199 (
200 Some(LogicalType::Integer {
201 bit_width: 64,
202 is_signed,
203 }),
204 _,
205 ) => match is_signed {
206 true => Ok(DataType::Int64),
207 false => Ok(DataType::UInt64),
208 },
209 (Some(LogicalType::Time { unit, .. }), _) => match unit {
210 ParquetTimeUnit::MILLIS(_) => {
211 Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
212 }
213 ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
214 ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
215 },
216 (
217 Some(LogicalType::Timestamp {
218 is_adjusted_to_u_t_c,
219 unit,
220 }),
221 _,
222 ) => Ok(DataType::Timestamp(
223 match unit {
224 ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
225 ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
226 ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
227 },
228 if is_adjusted_to_u_t_c {
229 Some("UTC".into())
230 } else {
231 None
232 },
233 )),
234 (None, ConvertedType::INT_64) => Ok(DataType::Int64),
235 (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
236 (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
237 (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
238 TimeUnit::Millisecond,
239 Some("UTC".into()),
240 )),
241 (None, ConvertedType::TIMESTAMP_MICROS) => Ok(DataType::Timestamp(
242 TimeUnit::Microsecond,
243 Some("UTC".into()),
244 )),
245 (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
246 (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
247 (logical, converted) => Err(arrow_err!(
248 "Unable to convert parquet INT64 logical type {:?} or converted type {}",
249 logical,
250 converted
251 )),
252 }
253}
254
255fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32) -> Result<DataType> {
256 match (info.logical_type(), info.converted_type()) {
257 (Some(LogicalType::String), _) => Ok(DataType::Utf8),
258 (Some(LogicalType::Json), _) => Ok(DataType::Utf8),
259 (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
260 (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
261 (None, ConvertedType::NONE) => Ok(DataType::Binary),
262 (None, ConvertedType::JSON) => Ok(DataType::Utf8),
263 (None, ConvertedType::BSON) => Ok(DataType::Binary),
264 (None, ConvertedType::ENUM) => Ok(DataType::Binary),
265 (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
266 (
267 Some(LogicalType::Decimal {
268 scale: s,
269 precision: p,
270 }),
271 _,
272 ) => decimal_type(s, p),
273 (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
274 (logical, converted) => Err(arrow_err!(
275 "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
276 logical,
277 converted
278 )),
279 }
280}
281
282fn from_fixed_len_byte_array(
283 info: &BasicTypeInfo,
284 scale: i32,
285 precision: i32,
286 type_length: i32,
287) -> Result<DataType> {
288 match (info.logical_type(), info.converted_type()) {
290 (Some(LogicalType::Decimal { scale, precision }), _) => {
291 if type_length <= 16 {
292 decimal_128_type(scale, precision)
293 } else {
294 decimal_256_type(scale, precision)
295 }
296 }
297 (None, ConvertedType::DECIMAL) => {
298 if type_length <= 16 {
299 decimal_128_type(scale, precision)
300 } else {
301 decimal_256_type(scale, precision)
302 }
303 }
304 (None, ConvertedType::INTERVAL) => {
305 Ok(DataType::Interval(IntervalUnit::DayTime))
309 }
310 (Some(LogicalType::Float16), _) => {
311 if type_length == 2 {
312 Ok(DataType::Float16)
313 } else {
314 Err(ParquetError::General(
315 "FLOAT16 logical type must be Fixed Length Byte Array with length 2"
316 .to_string(),
317 ))
318 }
319 }
320 _ => Ok(DataType::FixedSizeBinary(type_length)),
321 }
322}