1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::iter;

use mz_ore::now::NowFn;
use mz_repr::{Datum, Row, ScalarType};
use mz_storage_types::sources::load_generator::{Event, Generator};
use mz_storage_types::sources::MzOffset;

pub struct Datums {}

// Note that this generator never issues retractions; if you change this,
// `mz_storage_types::sources::LoadGenerator::is_monotonic`
// must be updated.
impl Generator for Datums {
    fn by_seed(
        &self,
        _: NowFn,
        _seed: Option<u64>,
        _resume_offset: MzOffset,
    ) -> Box<(dyn Iterator<Item = (usize, Event<Option<MzOffset>, (Row, i64)>)>)> {
        let typs = ScalarType::enumerate();
        let mut datums: Vec<Vec<Datum>> = typs
            .iter()
            .map(|typ| typ.interesting_datums().collect())
            .collect();
        let len = datums.iter().map(|v| v.len()).max().unwrap();
        // Fill in NULLs for shorter types, and append at least 1 NULL onto
        // every type.
        for dats in datums.iter_mut() {
            while dats.len() < (len + 1) {
                dats.push(Datum::Null);
            }
        }
        // Put the rowid column at the start.
        datums.insert(
            0,
            (1..=len + 1)
                .map(|i| Datum::Int64(i64::try_from(i).expect("must fit")))
                .collect(),
        );
        let mut idx = 0;
        let mut offset = 0;
        Box::new(
            iter::from_fn(move || {
                if idx == len {
                    return None;
                }
                let mut row = Row::with_capacity(datums.len());
                let mut packer = row.packer();
                for d in &datums {
                    packer.push(d[idx]);
                }
                let msg = (0, Event::Message(MzOffset::from(offset), (row, 1)));

                idx += 1;
                let progress = if idx == len {
                    offset += 1;
                    Some((0, Event::Progress(Some(MzOffset::from(offset)))))
                } else {
                    None
                };
                Some(std::iter::once(msg).chain(progress))
            })
            .flatten(),
        )
    }
}