mz_storage/source/generator/
clock.rs
1use mz_ore::cast::CastFrom;
11use mz_ore::now::NowFn;
12use mz_repr::{Datum, Diff, Row};
13use mz_storage_types::sources::MzOffset;
14use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
15use std::{iter, mem};
16
17pub struct Clock {
18 pub tick_ms: u64,
19 pub as_of_ms: u64,
20}
21
22impl Generator for Clock {
23 fn by_seed(
24 &self,
25 now: NowFn,
26 _seed: Option<u64>,
27 mut resume_offset: MzOffset,
28 ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
29 let interval_ms = self.tick_ms;
30 let floor = move |t| t / interval_ms * interval_ms;
31 let first_tick = floor(self.as_of_ms);
32 resume_offset = resume_offset.max(MzOffset::from(first_tick));
33
34 Box::new(
35 iter::from_fn(move || {
36 let next_offset = MzOffset::from(now() + 1).max(resume_offset);
37 let prev_offset = mem::replace(&mut resume_offset, next_offset);
38 Some((prev_offset, next_offset))
39 })
40 .flat_map(move |(lower, upper)| {
41 let row = move |tick: u64| {
42 let now_dt = mz_ore::now::to_datetime(tick)
43 .try_into()
44 .expect("system time out of bounds");
45 Row::pack_slice(&[Datum::TimestampTz(now_dt)])
46 };
47
48 let messages = (floor(lower.offset)..=floor(upper.offset))
49 .step_by(usize::cast_from(interval_ms))
50 .filter(move |&tick| lower.offset <= tick && tick < upper.offset)
51 .flat_map(move |at_offset| {
52 let last_offset = at_offset
53 .checked_sub(interval_ms)
54 .filter(move |&t| t >= first_tick);
55 [last_offset.map(|t| (t, -1)), Some((at_offset, 1))]
56 .into_iter()
57 .flatten()
58 .map(move |(time, diff)| {
59 Event::Message(MzOffset::from(at_offset), (row(time), diff.into()))
60 })
61 });
62 let progress = iter::once(Event::Progress(Some(upper)));
63
64 messages
65 .chain(progress)
66 .map(|e| (LoadGeneratorOutput::Default, e))
67 }),
68 )
69 }
70}