use std::convert::TryInto;
use std::{
cmp::min,
io::{Read, Seek, SeekFrom},
};
use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
use parquet_format_safe::FileMetaData as TFileMetaData;
use super::super::{
metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC,
};
use crate::error::{Error, Result};
pub(super) fn metadata_len(buffer: &[u8], len: usize) -> i32 {
i32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
}
fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error> {
let old_pos = seek.seek(SeekFrom::Current(0))?;
let len = seek.seek(SeekFrom::End(0))?;
if old_pos != len {
seek.seek(SeekFrom::Start(old_pos))?;
}
Ok(len)
}
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
let file_size = stream_len(reader)?;
if file_size < HEADER_SIZE + FOOTER_SIZE {
return Err(Error::oos(
"A parquet file must containt a header and footer with at least 12 bytes",
));
}
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
let mut buffer = Vec::with_capacity(default_end_len);
reader
.by_ref()
.take(default_end_len as u64)
.read_to_end(&mut buffer)?;
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
return Err(Error::oos("The file must end with PAR1"));
}
let metadata_len = metadata_len(&buffer, default_end_len);
let metadata_len: u64 = metadata_len.try_into()?;
let footer_len = FOOTER_SIZE + metadata_len;
if footer_len > file_size {
return Err(Error::oos(
"The footer size must be smaller or equal to the file's size",
));
}
let reader: &[u8] = if (footer_len as usize) < buffer.len() {
let remaining = buffer.len() - footer_len as usize;
&buffer[remaining..]
} else {
reader.seek(SeekFrom::End(-(footer_len as i64)))?;
buffer.clear();
buffer.try_reserve(footer_len as usize)?;
reader.take(footer_len as u64).read_to_end(&mut buffer)?;
&buffer
};
let max_size = reader.len() * 2 + 1024;
deserialize_metadata(reader, max_size)
}
pub fn deserialize_metadata<R: Read>(reader: R, max_size: usize) -> Result<FileMetaData> {
let mut prot = TCompactInputProtocol::new(reader, max_size);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;
FileMetaData::try_from_thrift(metadata)
}