mz_storage/source/generator/
datums.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::iter;
11
12use mz_ore::now::NowFn;
13use mz_repr::{Datum, Diff, Row, ScalarType};
14use mz_storage_types::sources::MzOffset;
15use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
16
17pub struct Datums {}
18
19// Note that this generator never issues retractions; if you change this,
20// `mz_storage_types::sources::LoadGenerator::is_monotonic`
21// must be updated.
22impl Generator for Datums {
23    fn by_seed(
24        &self,
25        _: NowFn,
26        _seed: Option<u64>,
27        _resume_offset: MzOffset,
28    ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
29        let typs = ScalarType::enumerate();
30        let mut datums: Vec<Vec<Datum>> = typs
31            .iter()
32            .map(|typ| typ.interesting_datums().collect())
33            .collect();
34        let len = datums.iter().map(|v| v.len()).max().unwrap();
35        // Fill in NULLs for shorter types, and append at least 1 NULL onto
36        // every type.
37        for dats in datums.iter_mut() {
38            while dats.len() < (len + 1) {
39                dats.push(Datum::Null);
40            }
41        }
42        // Put the rowid column at the start.
43        datums.insert(
44            0,
45            (1..=len + 1)
46                .map(|i| Datum::Int64(i64::try_from(i).expect("must fit")))
47                .collect(),
48        );
49        let mut idx = 0;
50        let mut offset = 0;
51        Box::new(
52            iter::from_fn(move || {
53                if idx == len {
54                    return None;
55                }
56                let mut row = Row::with_capacity(datums.len());
57                let mut packer = row.packer();
58                for d in &datums {
59                    packer.push(d[idx]);
60                }
61                let msg = (
62                    LoadGeneratorOutput::Default,
63                    Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
64                );
65
66                idx += 1;
67                let progress = if idx == len {
68                    offset += 1;
69                    Some((
70                        LoadGeneratorOutput::Default,
71                        Event::Progress(Some(MzOffset::from(offset))),
72                    ))
73                } else {
74                    None
75                };
76                Some(std::iter::once(msg).chain(progress))
77            })
78            .flatten(),
79        )
80    }
81}