1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use parquet2::page::Page;

use crate::{array::NullArray, datatypes::DataType};

use super::super::{ArrayIter, Pages};

/// Converts [`Pages`] to an [`ArrayIter`]
pub fn iter_to_arrays<'a, I>(
    mut iter: I,
    data_type: DataType,
    chunk_size: Option<usize>,
    num_rows: usize,
) -> ArrayIter<'a>
where
    I: 'a + Pages,
{
    let mut len = 0usize;

    while let Ok(Some(page)) = iter.next() {
        match page {
            Page::Dict(_) => continue,
            Page::Data(page) => {
                let rows = page.num_values();
                len = (len + rows).min(num_rows);
                if len == num_rows {
                    break;
                }
            }
        }
    }

    if len == 0 {
        return Box::new(std::iter::empty());
    }

    let chunk_size = chunk_size.unwrap_or(len);

    let complete_chunks = len / chunk_size;

    let remainder = len - (complete_chunks * chunk_size);
    let i_data_type = data_type.clone();
    let complete = (0..complete_chunks)
        .map(move |_| Ok(NullArray::new(i_data_type.clone(), chunk_size).boxed()));
    if len % chunk_size == 0 {
        Box::new(complete)
    } else {
        let array = NullArray::new(data_type, remainder);
        Box::new(complete.chain(std::iter::once(Ok(array.boxed()))))
    }
}

#[cfg(test)]
mod tests {
    use parquet2::{
        encoding::Encoding,
        error::Error as ParquetError,
        metadata::Descriptor,
        page::{DataPage, DataPageHeader, DataPageHeaderV1, Page},
        schema::types::{PhysicalType, PrimitiveType},
    };

    use crate::{array::NullArray, datatypes::DataType, error::Error};

    use super::iter_to_arrays;

    #[test]
    fn limit() {
        let new_page = |values: i32| {
            Page::Data(DataPage::new(
                DataPageHeader::V1(DataPageHeaderV1 {
                    num_values: values,
                    encoding: Encoding::Plain.into(),
                    definition_level_encoding: Encoding::Plain.into(),
                    repetition_level_encoding: Encoding::Plain.into(),
                    statistics: None,
                }),
                vec![],
                Descriptor {
                    primitive_type: PrimitiveType::from_physical(
                        "a".to_string(),
                        PhysicalType::Int32,
                    ),
                    max_def_level: 0,
                    max_rep_level: 0,
                },
                None,
            ))
        };

        let p1 = new_page(100);
        let p2 = new_page(100);
        let pages = vec![Result::<_, ParquetError>::Ok(&p1), Ok(&p2)];
        let pages = fallible_streaming_iterator::convert(pages.into_iter());
        let arrays = iter_to_arrays(pages, DataType::Null, Some(10), 101);

        let arrays = arrays.collect::<Result<Vec<_>, Error>>().unwrap();
        let expected = std::iter::repeat(NullArray::new(DataType::Null, 10).boxed())
            .take(10)
            .chain(std::iter::once(NullArray::new(DataType::Null, 1).boxed()));
        assert_eq!(arrays, expected.collect::<Vec<_>>())
    }
}