mz_storage/source/generator/
clock.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::cast::CastFrom;
11use mz_ore::now::NowFn;
12use mz_repr::{Datum, Diff, Row};
13use mz_storage_types::sources::MzOffset;
14use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
15use std::{iter, mem};
16
17pub struct Clock {
18    pub tick_ms: u64,
19    pub as_of_ms: u64,
20}
21
22impl Generator for Clock {
23    fn by_seed(
24        &self,
25        now: NowFn,
26        _seed: Option<u64>,
27        mut resume_offset: MzOffset,
28    ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
29        let interval_ms = self.tick_ms;
30        let floor = move |t| t / interval_ms * interval_ms;
31        let first_tick = floor(self.as_of_ms);
32        resume_offset = resume_offset.max(MzOffset::from(first_tick));
33
34        Box::new(
35            iter::from_fn(move || {
36                let next_offset = MzOffset::from(now() + 1).max(resume_offset);
37                let prev_offset = mem::replace(&mut resume_offset, next_offset);
38                Some((prev_offset, next_offset))
39            })
40            .flat_map(move |(lower, upper)| {
41                let row = move |tick: u64| {
42                    let now_dt = mz_ore::now::to_datetime(tick)
43                        .try_into()
44                        .expect("system time out of bounds");
45                    Row::pack_slice(&[Datum::TimestampTz(now_dt)])
46                };
47
48                let messages = (floor(lower.offset)..=floor(upper.offset))
49                    .step_by(usize::cast_from(interval_ms))
50                    .filter(move |&tick| lower.offset <= tick && tick < upper.offset)
51                    .flat_map(move |at_offset| {
52                        let last_offset = at_offset
53                            .checked_sub(interval_ms)
54                            .filter(move |&t| t >= first_tick);
55                        [last_offset.map(|t| (t, -1)), Some((at_offset, 1))]
56                            .into_iter()
57                            .flatten()
58                            .map(move |(time, diff)| {
59                                Event::Message(MzOffset::from(at_offset), (row(time), diff.into()))
60                            })
61                    });
62                let progress = iter::once(Event::Progress(Some(upper)));
63
64                messages
65                    .chain(progress)
66                    .map(|e| (LoadGeneratorOutput::Default, e))
67            }),
68        )
69    }
70}