mz_storage/source/generator/
datums.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::iter;
11
12use mz_ore::now::NowFn;
13use mz_repr::{Datum, Diff, Row, ScalarType};
14use mz_storage_types::sources::MzOffset;
15use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
16
17pub struct Datums {}
18
19// Note that this generator never issues retractions; if you change this,
20// `mz_storage_types::sources::LoadGenerator::is_monotonic`
21// must be updated.
22impl Generator for Datums {
23    fn by_seed(
24        &self,
25        _: NowFn,
26        _seed: Option<u64>,
27        _resume_offset: MzOffset,
28    ) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>)>
29    {
30        let typs = ScalarType::enumerate();
31        let mut datums: Vec<Vec<Datum>> = typs
32            .iter()
33            .map(|typ| typ.interesting_datums().collect())
34            .collect();
35        let len = datums.iter().map(|v| v.len()).max().unwrap();
36        // Fill in NULLs for shorter types, and append at least 1 NULL onto
37        // every type.
38        for dats in datums.iter_mut() {
39            while dats.len() < (len + 1) {
40                dats.push(Datum::Null);
41            }
42        }
43        // Put the rowid column at the start.
44        datums.insert(
45            0,
46            (1..=len + 1)
47                .map(|i| Datum::Int64(i64::try_from(i).expect("must fit")))
48                .collect(),
49        );
50        let mut idx = 0;
51        let mut offset = 0;
52        Box::new(
53            iter::from_fn(move || {
54                if idx == len {
55                    return None;
56                }
57                let mut row = Row::with_capacity(datums.len());
58                let mut packer = row.packer();
59                for d in &datums {
60                    packer.push(d[idx]);
61                }
62                let msg = (
63                    LoadGeneratorOutput::Default,
64                    Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
65                );
66
67                idx += 1;
68                let progress = if idx == len {
69                    offset += 1;
70                    Some((
71                        LoadGeneratorOutput::Default,
72                        Event::Progress(Some(MzOffset::from(offset))),
73                    ))
74                } else {
75                    None
76                };
77                Some(std::iter::once(msg).chain(progress))
78            })
79            .flatten(),
80        )
81    }
82}