use std::io::SeekFrom;
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use super::super::{metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC};
use super::metadata::{deserialize_metadata, metadata_len};
use crate::error::{Error, Result};
use crate::HEADER_SIZE;
async fn stream_len(
seek: &mut (impl AsyncSeek + std::marker::Unpin),
) -> std::result::Result<u64, std::io::Error> {
let old_pos = seek.seek(SeekFrom::Current(0)).await?;
let len = seek.seek(SeekFrom::End(0)).await?;
if old_pos != len {
seek.seek(SeekFrom::Start(old_pos)).await?;
}
Ok(len)
}
pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>(
reader: &mut R,
) -> Result<FileMetaData> {
let file_size = stream_len(reader).await?;
if file_size < HEADER_SIZE + FOOTER_SIZE {
return Err(Error::oos(
"A parquet file must containt a header and footer with at least 12 bytes",
));
}
let default_end_len = std::cmp::min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
reader
.seek(SeekFrom::End(-(default_end_len as i64)))
.await?;
let mut buffer = vec![];
buffer.try_reserve(default_end_len)?;
reader
.take(default_end_len as u64)
.read_to_end(&mut buffer)
.await?;
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
return Err(Error::oos("Invalid Parquet file. Corrupt footer"));
}
let metadata_len = metadata_len(&buffer, default_end_len);
let metadata_len: u64 = metadata_len.try_into()?;
let footer_len = FOOTER_SIZE + metadata_len;
if footer_len > file_size {
return Err(Error::oos(
"The footer size must be smaller or equal to the file's size",
));
}
let reader = if (footer_len as usize) < buffer.len() {
let remaining = buffer.len() - footer_len as usize;
&buffer[remaining..]
} else {
reader.seek(SeekFrom::End(-(footer_len as i64))).await?;
buffer.clear();
buffer.try_reserve(footer_len as usize)?;
reader
.take(footer_len as u64)
.read_to_end(&mut buffer)
.await?;
&buffer
};
let max_size = reader.len() * 2 + 1024;
deserialize_metadata(reader, max_size)
}