use parquet2::{
metadata::KeyValue,
schema::{
types::{
GroupLogicalType, IntegerType, ParquetType, PhysicalType, PrimitiveConvertedType,
PrimitiveLogicalType, TimeUnit as ParquetTimeUnit,
},
Repetition,
},
};
use crate::{
datatypes::{DataType, Field, Schema, TimeUnit},
error::{Error, Result},
io::ipc::write::default_ipc_fields,
io::ipc::write::schema_to_bytes,
io::parquet::write::decimal_length_from_precision,
};
use super::super::ARROW_SCHEMA_META_KEY;
pub fn schema_to_metadata_key(schema: &Schema) -> KeyValue {
let serialized_schema = schema_to_bytes(schema, &default_ipc_fields(&schema.fields));
let schema_len = serialized_schema.len();
let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
len_prefix_schema.extend_from_slice(&[255u8, 255, 255, 255]);
len_prefix_schema.extend_from_slice(&(schema_len as u32).to_le_bytes());
len_prefix_schema.extend_from_slice(&serialized_schema);
let encoded = base64::encode(&len_prefix_schema);
KeyValue {
key: ARROW_SCHEMA_META_KEY.to_string(),
value: Some(encoded),
}
}
pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
let name = field.name.clone();
let repetition = if field.is_nullable {
Repetition::Optional
} else {
Repetition::Required
};
match field.data_type().to_logical_type() {
DataType::Null => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
None,
Some(PrimitiveLogicalType::Unknown),
None,
)?),
DataType::Boolean => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Boolean,
repetition,
None,
None,
None,
)?),
DataType::Int32 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
None,
None,
None,
)?),
DataType::Int64 | DataType::Duration(_) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int64,
repetition,
None,
None,
None,
)?),
DataType::Date64 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int64,
repetition,
None,
None,
None,
)?),
DataType::Float32 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Float,
repetition,
None,
None,
None,
)?),
DataType::Float64 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Double,
repetition,
None,
None,
None,
)?),
DataType::Binary | DataType::LargeBinary => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::ByteArray,
repetition,
None,
None,
None,
)?),
DataType::Utf8 | DataType::LargeUtf8 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::ByteArray,
repetition,
Some(PrimitiveConvertedType::Utf8),
Some(PrimitiveLogicalType::String),
None,
)?),
DataType::Date32 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
Some(PrimitiveConvertedType::Date),
Some(PrimitiveLogicalType::Date),
None,
)?),
DataType::Int8 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
Some(PrimitiveConvertedType::Int8),
Some(PrimitiveLogicalType::Integer(IntegerType::Int8)),
None,
)?),
DataType::Int16 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
Some(PrimitiveConvertedType::Int16),
Some(PrimitiveLogicalType::Integer(IntegerType::Int16)),
None,
)?),
DataType::UInt8 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
Some(PrimitiveConvertedType::Uint8),
Some(PrimitiveLogicalType::Integer(IntegerType::UInt8)),
None,
)?),
DataType::UInt16 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
Some(PrimitiveConvertedType::Uint16),
Some(PrimitiveLogicalType::Integer(IntegerType::UInt16)),
None,
)?),
DataType::UInt32 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
Some(PrimitiveConvertedType::Uint32),
Some(PrimitiveLogicalType::Integer(IntegerType::UInt32)),
None,
)?),
DataType::UInt64 => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int64,
repetition,
Some(PrimitiveConvertedType::Uint64),
Some(PrimitiveLogicalType::Integer(IntegerType::UInt64)),
None,
)?),
DataType::Timestamp(TimeUnit::Second, _) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int64,
repetition,
None,
None,
None,
)?),
DataType::Timestamp(time_unit, zone) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int64,
repetition,
None,
Some(PrimitiveLogicalType::Timestamp {
is_adjusted_to_utc: matches!(zone, Some(z) if !z.as_str().is_empty()),
unit: match time_unit {
TimeUnit::Second => unreachable!(),
TimeUnit::Millisecond => ParquetTimeUnit::Milliseconds,
TimeUnit::Microsecond => ParquetTimeUnit::Microseconds,
TimeUnit::Nanosecond => ParquetTimeUnit::Nanoseconds,
},
}),
None,
)?),
DataType::Time32(TimeUnit::Second) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
None,
None,
None,
)?),
DataType::Time32(TimeUnit::Millisecond) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
repetition,
Some(PrimitiveConvertedType::TimeMillis),
Some(PrimitiveLogicalType::Time {
is_adjusted_to_utc: false,
unit: ParquetTimeUnit::Milliseconds,
}),
None,
)?),
DataType::Time64(time_unit) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int64,
repetition,
match time_unit {
TimeUnit::Microsecond => Some(PrimitiveConvertedType::TimeMicros),
TimeUnit::Nanosecond => None,
_ => unreachable!(),
},
Some(PrimitiveLogicalType::Time {
is_adjusted_to_utc: false,
unit: match time_unit {
TimeUnit::Microsecond => ParquetTimeUnit::Microseconds,
TimeUnit::Nanosecond => ParquetTimeUnit::Nanoseconds,
_ => unreachable!(),
},
}),
None,
)?),
DataType::Struct(fields) => {
if fields.is_empty() {
return Err(Error::InvalidArgumentError(
"Parquet does not support writing empty structs".to_string(),
));
}
let fields = fields
.iter()
.map(to_parquet_type)
.collect::<Result<Vec<_>>>()?;
Ok(ParquetType::from_group(
name, repetition, None, None, fields, None,
))
}
DataType::Dictionary(_, value, _) => {
let dict_field = Field::new(name.as_str(), value.as_ref().clone(), field.is_nullable);
to_parquet_type(&dict_field)
}
DataType::FixedSizeBinary(size) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::FixedLenByteArray(*size),
repetition,
None,
None,
None,
)?),
DataType::Decimal(precision, scale) => {
let precision = *precision;
let scale = *scale;
let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale));
let physical_type = if precision <= 9 {
PhysicalType::Int32
} else if precision <= 18 {
PhysicalType::Int64
} else {
let len = decimal_length_from_precision(precision);
PhysicalType::FixedLenByteArray(len)
};
Ok(ParquetType::try_from_primitive(
name,
physical_type,
repetition,
Some(PrimitiveConvertedType::Decimal(precision, scale)),
logical_type,
None,
)?)
}
DataType::Interval(_) => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::FixedLenByteArray(12),
repetition,
Some(PrimitiveConvertedType::Interval),
None,
None,
)?),
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
Ok(ParquetType::from_group(
name,
repetition,
None,
Some(GroupLogicalType::List),
vec![ParquetType::from_group(
"list".to_string(),
Repetition::Repeated,
None,
None,
vec![to_parquet_type(f)?],
None,
)],
None,
))
}
other => Err(Error::NotYetImplemented(format!(
"Writing the data type {other:?} is not yet implemented"
))),
}
}