use std::fmt::Debug;
use std::io::{Read, Seek, Write};
use anyhow::anyhow;
use arrow2::datatypes::Schema as ArrowSchema;
use arrow2::io::parquet::read::{infer_schema, read_metadata, FileReader};
use arrow2::io::parquet::write::{
row_group_iter, to_parquet_schema, CompressionOptions, Version, WriteOptions,
};
use parquet2::write::{DynIter, FileWriter, WriteOptions as ParquetWriteOptions};
use crate::codec_impls::UnitSchema;
use crate::columnar::{PartDecoder, Schema};
use crate::part::{Part, PartBuilder};
pub fn encode_part<W: Write>(w: &mut W, part: &Part) -> Result<(), anyhow::Error> {
let metadata = Vec::new();
let (fields, encodings, chunk) = part.to_arrow();
let schema = ArrowSchema::from(fields);
let parquet_schema = to_parquet_schema(&schema)?;
let options = WriteOptions {
write_statistics: false,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
data_pagesize_limit: None, };
let created_by = None;
let mut writer = FileWriter::new(
w,
parquet_schema.clone(),
ParquetWriteOptions {
version: options.version,
write_statistics: options.write_statistics,
},
created_by,
);
let row_group = DynIter::new(row_group_iter(
chunk,
encodings,
parquet_schema.fields().to_vec(),
options,
));
writer.write(row_group)?;
writer.end(Some(metadata))?;
Ok(())
}
pub fn decode_part<R: Read + Seek, K, KS: Schema<K>, V, VS: Schema<V>>(
r: &mut R,
key_schema: &KS,
val_schema: &VS,
) -> Result<Part, anyhow::Error> {
let metadata = read_metadata(r)?;
let schema = infer_schema(&metadata)?;
let mut reader = FileReader::new(r, metadata.row_groups, schema, None, None, None);
let chunk = reader
.next()
.ok_or_else(|| anyhow!("not enough chunks in part"))?
.map_err(anyhow::Error::new)?;
let part = Part::from_arrow(key_schema, val_schema, chunk).map_err(anyhow::Error::msg)?;
if let Some(_) = reader.next() {
return Err(anyhow!("too many chunks in part"));
}
Ok(part)
}
pub fn validate_roundtrip<T: Default + PartialEq + Debug, S: Schema<T>>(
schema: &S,
value: &T,
) -> Result<(), String> {
let mut builder = PartBuilder::new(schema, &UnitSchema)?;
builder.push(value, &(), 1u64, 1i64);
let part = builder.finish();
let _stats = part.key_stats().expect("stats should be compute-able");
let mut encoded = Vec::new();
let () = encode_part(&mut encoded, &part).map_err(|err| err.to_string())?;
let part = decode_part(&mut std::io::Cursor::new(&encoded), schema, &UnitSchema)
.map_err(|err| err.to_string())?;
let mut actual = T::default();
assert_eq!(part.len(), 1);
let part = part.key_ref();
schema.decoder(part)?.decode(0, &mut actual);
if &actual != value {
Err(format!(
"validate_roundtrip expected {:?} but got {:?}",
value, actual
))
} else {
Ok(())
}
}