1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//! APIs to read from Parquet format.
#![allow(clippy::type_complexity)]

mod deserialize;
mod file;
pub mod indexes;
mod row_group;
pub mod schema;
pub mod statistics;

use std::io::{Read, Seek};

use futures::{AsyncRead, AsyncSeek};

// re-exports of parquet2's relevant APIs
pub use parquet2::{
    error::Error as ParquetError,
    fallible_streaming_iterator,
    metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData},
    page::{CompressedDataPage, DataPageHeader, Page},
    read::{
        decompress, get_column_iterator, get_page_stream,
        read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata,
        read_metadata_async as _read_metadata_async, read_pages_locations, BasicDecompressor,
        Decompressor, MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State,
    },
    schema::types::{
        GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType,
        TimeUnit as ParquetTimeUnit,
    },
    types::int96_to_i64_ns,
    FallibleStreamingIterator,
};

use crate::{array::Array, error::Result};

pub use deserialize::{
    column_iter_to_arrays, create_list, get_page_iterator, init_nested, n_columns, InitNested,
    NestedArrayIter, NestedState, StructIterator,
};
pub use file::{FileReader, RowGroupReader};
pub use row_group::*;
pub use schema::{infer_schema, FileMetaData};

/// Trait describing a [`FallibleStreamingIterator`] of [`Page`]
pub trait Pages:
    FallibleStreamingIterator<Item = Page, Error = ParquetError> + Send + Sync
{
}

impl<I: FallibleStreamingIterator<Item = Page, Error = ParquetError> + Send + Sync> Pages for I {}

/// Type def for a sharable, boxed dyn [`Iterator`] of arrays
pub type ArrayIter<'a> = Box<dyn Iterator<Item = Result<Box<dyn Array>>> + Send + Sync + 'a>;

/// Reads parquets' metadata syncronously.
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
    Ok(_read_metadata(reader)?)
}

/// Reads parquets' metadata asynchronously.
pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
    reader: &mut R,
) -> Result<FileMetaData> {
    Ok(_read_metadata_async(reader).await?)
}

fn convert_days_ms(value: &[u8]) -> crate::types::days_ms {
    crate::types::days_ms(
        i32::from_le_bytes(value[4..8].try_into().unwrap()),
        i32::from_le_bytes(value[8..12].try_into().unwrap()),
    )
}

fn convert_i128(value: &[u8], n: usize) -> i128 {
    // Copy the fixed-size byte value to the start of a 16 byte stack
    // allocated buffer, then use an arithmetic right shift to fill in
    // MSBs, which accounts for leading 1's in negative (two's complement)
    // values.
    let mut bytes = [0u8; 16];
    bytes[..n].copy_from_slice(value);
    i128::from_be_bytes(bytes) >> (8 * (16 - n))
}