1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Parquet serialization and deserialization for persist data.
//!
//! TODO: Move this into mz_persist_client::internal once we don't need
//! [validate_roundtrip].

use std::fmt::Debug;
use std::io::{Read, Seek, Write};

use anyhow::anyhow;
use arrow2::datatypes::Schema as ArrowSchema;
use arrow2::io::parquet::read::{infer_schema, read_metadata, FileReader};
use arrow2::io::parquet::write::{
    row_group_iter, to_parquet_schema, CompressionOptions, Version, WriteOptions,
};
use parquet2::write::{DynIter, FileWriter, WriteOptions as ParquetWriteOptions};

use crate::codec_impls::UnitSchema;
use crate::columnar::{PartDecoder, Schema};
use crate::part::{Part, PartBuilder};

/// Encodes the given part into our parquet-based serialization format.
///
/// It doesn't particularly get any anything to use more than one "chunk" per
/// blob, and it's simpler to only have one, so do that.
pub fn encode_part<W: Write>(w: &mut W, part: &Part) -> Result<(), anyhow::Error> {
    let metadata = Vec::new();
    let (fields, encodings, chunk) = part.to_arrow();

    let schema = ArrowSchema::from(fields);
    // Construct a FileWriter manually so we can omit the created_by string and
    // (redundant) arrow schema.
    let parquet_schema = to_parquet_schema(&schema)?;
    let options = WriteOptions {
        write_statistics: false,
        compression: CompressionOptions::Uncompressed,
        version: Version::V2,
        data_pagesize_limit: None, // use default limit
    };
    let created_by = None;
    let mut writer = FileWriter::new(
        w,
        parquet_schema.clone(),
        ParquetWriteOptions {
            version: options.version,
            write_statistics: options.write_statistics,
        },
        created_by,
    );

    let row_group = DynIter::new(row_group_iter(
        chunk,
        encodings,
        parquet_schema.fields().to_vec(),
        options,
    ));
    writer.write(row_group)?;
    writer.end(Some(metadata))?;
    Ok(())
}

/// Decodes a part with the given schema from our parquet-based serialization
/// format.
pub fn decode_part<R: Read + Seek, K, KS: Schema<K>, V, VS: Schema<V>>(
    r: &mut R,
    key_schema: &KS,
    val_schema: &VS,
) -> Result<Part, anyhow::Error> {
    let metadata = read_metadata(r)?;
    let schema = infer_schema(&metadata)?;
    let mut reader = FileReader::new(r, metadata.row_groups, schema, None, None, None);

    // encode_part documents that there is exactly one chunk in every blob.
    // Verify that here by ensuring the first call to `next` is Some and the
    // second call to it is None.
    let chunk = reader
        .next()
        .ok_or_else(|| anyhow!("not enough chunks in part"))?
        .map_err(anyhow::Error::new)?;
    let part = Part::from_arrow(key_schema, val_schema, chunk).map_err(anyhow::Error::msg)?;

    if let Some(_) = reader.next() {
        return Err(anyhow!("too many chunks in part"));
    }

    Ok(part)
}

/// A helper for writing tests that validate that a piece of data roundtrips
/// through the parquet serialization format.
pub fn validate_roundtrip<T: Default + PartialEq + Debug, S: Schema<T>>(
    schema: &S,
    value: &T,
) -> Result<(), String> {
    let mut builder = PartBuilder::new(schema, &UnitSchema)?;
    builder.push(value, &(), 1u64, 1i64);
    let part = builder.finish();

    // Sanity check that we can compute stats.
    let _stats = part.key_stats().expect("stats should be compute-able");

    let mut encoded = Vec::new();
    let () = encode_part(&mut encoded, &part).map_err(|err| err.to_string())?;
    let part = decode_part(&mut std::io::Cursor::new(&encoded), schema, &UnitSchema)
        .map_err(|err| err.to_string())?;

    let mut actual = T::default();
    assert_eq!(part.len(), 1);
    let part = part.key_ref();
    schema.decoder(part)?.decode(0, &mut actual);
    if &actual != value {
        Err(format!(
            "validate_roundtrip expected {:?} but got {:?}",
            value, actual
        ))
    } else {
        Ok(())
    }
}