use std::io::Write;
use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
use parquet_format_safe::thrift::protocol::TOutputProtocol;
use parquet_format_safe::RowGroup;
use crate::metadata::ThriftFileMetaData;
use crate::{
error::{Error, Result},
metadata::SchemaDescriptor,
FOOTER_SIZE, PARQUET_MAGIC,
};
use super::indexes::{write_column_index, write_offset_index};
use super::page::PageWriteSpec;
use super::{row_group::write_row_group, RowGroupIter, WriteOptions};
pub use crate::metadata::KeyValue;
use crate::write::State;
pub(super) fn start_file<W: Write>(writer: &mut W) -> Result<u64> {
writer.write_all(&PARQUET_MAGIC)?;
Ok(PARQUET_MAGIC.len() as u64)
}
pub(super) fn end_file<W: Write>(mut writer: &mut W, metadata: &ThriftFileMetaData) -> Result<u64> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let metadata_len = metadata.write_to_out_protocol(&mut protocol)? as i32;
protocol.flush()?;
let metadata_bytes = metadata_len.to_le_bytes();
let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
(0..4).for_each(|i| {
footer_buffer[i] = metadata_bytes[i];
});
(&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
writer.write_all(&footer_buffer)?;
Ok(metadata_len as u64 + FOOTER_SIZE)
}
pub struct FileWriter<W: Write> {
writer: W,
schema: SchemaDescriptor,
options: WriteOptions,
created_by: Option<String>,
offset: u64,
row_groups: Vec<RowGroup>,
page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
state: State,
metadata: Option<ThriftFileMetaData>,
}
pub fn write_metadata_sidecar<W: Write>(
writer: &mut W,
metadata: &ThriftFileMetaData,
) -> Result<u64> {
let mut len = start_file(writer)?;
len += end_file(writer, metadata)?;
Ok(len)
}
impl<W: Write> FileWriter<W> {
pub fn options(&self) -> &WriteOptions {
&self.options
}
pub fn schema(&self) -> &SchemaDescriptor {
&self.schema
}
pub fn metadata(&self) -> Option<&ThriftFileMetaData> {
self.metadata.as_ref()
}
}
impl<W: Write> FileWriter<W> {
pub fn new(
writer: W,
schema: SchemaDescriptor,
options: WriteOptions,
created_by: Option<String>,
) -> Self {
Self {
writer,
schema,
options,
created_by,
offset: 0,
row_groups: vec![],
page_specs: vec![],
state: State::Initialised,
metadata: None,
}
}
fn start(&mut self) -> Result<()> {
if self.offset == 0 {
self.offset = start_file(&mut self.writer)? as u64;
self.state = State::Started;
Ok(())
} else {
Err(Error::InvalidParameter(
"Start cannot be called twice".to_string(),
))
}
}
pub fn write<E>(&mut self, row_group: RowGroupIter<'_, E>) -> Result<()>
where
Error: From<E>,
E: std::error::Error,
{
if self.offset == 0 {
self.start()?;
}
let ordinal = self.row_groups.len();
let (group, specs, size) = write_row_group(
&mut self.writer,
self.offset,
self.schema.columns(),
row_group,
ordinal,
)?;
self.offset += size;
self.row_groups.push(group);
self.page_specs.push(specs);
Ok(())
}
pub fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> Result<u64> {
if self.offset == 0 {
self.start()?;
}
if self.state != State::Started {
return Err(Error::InvalidParameter(
"End cannot be called twice".to_string(),
));
}
let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
if self.options.write_statistics {
self.row_groups
.iter_mut()
.zip(self.page_specs.iter())
.try_for_each(|(group, pages)| {
group.columns.iter_mut().zip(pages.iter()).try_for_each(
|(column, pages)| {
let offset = self.offset;
column.column_index_offset = Some(offset as i64);
self.offset += write_column_index(&mut self.writer, pages)?;
let length = self.offset - offset;
column.column_index_length = Some(length as i32);
Result::Ok(())
},
)?;
Result::Ok(())
})?;
};
self.row_groups
.iter_mut()
.zip(self.page_specs.iter())
.try_for_each(|(group, pages)| {
group
.columns
.iter_mut()
.zip(pages.iter())
.try_for_each(|(column, pages)| {
let offset = self.offset;
column.offset_index_offset = Some(offset as i64);
self.offset += write_offset_index(&mut self.writer, pages)?;
column.offset_index_length = Some((self.offset - offset) as i32);
Result::Ok(())
})?;
Result::Ok(())
})?;
let metadata = ThriftFileMetaData::new(
self.options.version.into(),
self.schema.clone().into_thrift(),
num_rows,
self.row_groups.clone(),
key_value_metadata,
self.created_by.clone(),
None,
None,
None,
);
let len = end_file(&mut self.writer, &metadata)?;
self.state = State::Finished;
self.metadata = Some(metadata);
Ok(self.offset + len)
}
pub fn into_inner(self) -> W {
self.writer
}
pub fn into_inner_and_metadata(self) -> (W, ThriftFileMetaData) {
(self.writer, self.metadata.expect("File to have ended"))
}
}
#[cfg(test)]
mod tests {
use std::{fs::File, io::Cursor};
use super::*;
use crate::error::Result;
use crate::read::read_metadata;
use crate::tests::get_path;
#[test]
fn empty_file() -> Result<()> {
let mut testdata = get_path();
testdata.push("alltypes_plain.parquet");
let mut file = File::open(testdata).unwrap();
let mut metadata = read_metadata(&mut file)?;
metadata.row_groups = vec![];
metadata.num_rows = 0;
let mut writer = Cursor::new(vec![]);
start_file(&mut writer)?;
end_file(&mut writer, &metadata.into_thrift())?;
let a = writer.into_inner();
let result = read_metadata(&mut Cursor::new(a));
assert!(result.is_ok());
Ok(())
}
}