mz_storage/source/generator/
datums.rs
1use std::iter;
11
12use mz_ore::now::NowFn;
13use mz_repr::{Datum, Diff, Row, ScalarType};
14use mz_storage_types::sources::MzOffset;
15use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
16
17pub struct Datums {}
18
19impl Generator for Datums {
23 fn by_seed(
24 &self,
25 _: NowFn,
26 _seed: Option<u64>,
27 _resume_offset: MzOffset,
28 ) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>)>
29 {
30 let typs = ScalarType::enumerate();
31 let mut datums: Vec<Vec<Datum>> = typs
32 .iter()
33 .map(|typ| typ.interesting_datums().collect())
34 .collect();
35 let len = datums.iter().map(|v| v.len()).max().unwrap();
36 for dats in datums.iter_mut() {
39 while dats.len() < (len + 1) {
40 dats.push(Datum::Null);
41 }
42 }
43 datums.insert(
45 0,
46 (1..=len + 1)
47 .map(|i| Datum::Int64(i64::try_from(i).expect("must fit")))
48 .collect(),
49 );
50 let mut idx = 0;
51 let mut offset = 0;
52 Box::new(
53 iter::from_fn(move || {
54 if idx == len {
55 return None;
56 }
57 let mut row = Row::with_capacity(datums.len());
58 let mut packer = row.packer();
59 for d in &datums {
60 packer.push(d[idx]);
61 }
62 let msg = (
63 LoadGeneratorOutput::Default,
64 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
65 );
66
67 idx += 1;
68 let progress = if idx == len {
69 offset += 1;
70 Some((
71 LoadGeneratorOutput::Default,
72 Event::Progress(Some(MzOffset::from(offset))),
73 ))
74 } else {
75 None
76 };
77 Some(std::iter::once(msg).chain(progress))
78 })
79 .flatten(),
80 )
81 }
82}