mz_storage/source/generator/
datums.rs1use std::iter;
11
12use mz_ore::now::NowFn;
13use mz_repr::{Datum, Diff, Row, ScalarType};
14use mz_storage_types::sources::MzOffset;
15use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
16
17pub struct Datums {}
18
19impl Generator for Datums {
23 fn by_seed(
24 &self,
25 _: NowFn,
26 _seed: Option<u64>,
27 _resume_offset: MzOffset,
28 ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
29 let typs = ScalarType::enumerate();
30 let mut datums: Vec<Vec<Datum>> = typs
31 .iter()
32 .map(|typ| typ.interesting_datums().collect())
33 .collect();
34 let len = datums.iter().map(|v| v.len()).max().unwrap();
35 for dats in datums.iter_mut() {
38 while dats.len() < (len + 1) {
39 dats.push(Datum::Null);
40 }
41 }
42 datums.insert(
44 0,
45 (1..=len + 1)
46 .map(|i| Datum::Int64(i64::try_from(i).expect("must fit")))
47 .collect(),
48 );
49 let mut idx = 0;
50 let mut offset = 0;
51 Box::new(
52 iter::from_fn(move || {
53 if idx == len {
54 return None;
55 }
56 let mut row = Row::with_capacity(datums.len());
57 let mut packer = row.packer();
58 for d in &datums {
59 packer.push(d[idx]);
60 }
61 let msg = (
62 LoadGeneratorOutput::Default,
63 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
64 );
65
66 idx += 1;
67 let progress = if idx == len {
68 offset += 1;
69 Some((
70 LoadGeneratorOutput::Default,
71 Event::Progress(Some(MzOffset::from(offset))),
72 ))
73 } else {
74 None
75 };
76 Some(std::iter::once(msg).chain(progress))
77 })
78 .flatten(),
79 )
80 }
81}