mz_storage/source/generator/counter.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::now::NowFn;
11use mz_repr::{Datum, Diff, Row};
12use mz_storage_types::sources::MzOffset;
13use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
14
15pub struct Counter {
16 /// How many values will be emitted before old ones are retracted,
17 /// or `None` for an append-only collection. (If this retraction
18 /// behavior is changed,
19 /// `mz_storage_types::sources::LoadGenerator::is_monotonic`
20 /// must be updated.
21 pub max_cardinality: Option<u64>,
22}
23
24impl Generator for Counter {
25 fn by_seed(
26 &self,
27 _now: NowFn,
28 _seed: Option<u64>,
29 resume_offset: MzOffset,
30 ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
31 let max_cardinality = self.max_cardinality;
32
33 Box::new(
34 (resume_offset.offset..)
35 .map(move |offset| {
36 let retraction = match max_cardinality {
37 // At offset `max` we must start retracting the value of `offset - max`. For
38 // example if max_cardinality is 2 then the collection should contain:
39 // (1, 0, +1)
40 // (2, 1, +1)
41 // (1, 2, -1) <- Here offset becomes >= max and we retract the value that was
42 // (3, 2, +1) emitted at (offset - max), which equals (offset - max + 1).
43 // (2, 3, -1)
44 // (4, 3, +1)
45 Some(max) if offset >= max => {
46 let retracted_value = i64::try_from(offset - max + 1).unwrap();
47 let row = Row::pack_slice(&[Datum::Int64(retracted_value)]);
48 Some((
49 LoadGeneratorOutput::Default,
50 Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
51 ))
52 }
53 _ => None,
54 };
55
56 let inserted_value = i64::try_from(offset + 1).unwrap();
57 let row = Row::pack_slice(&[Datum::Int64(inserted_value)]);
58 let insertion = [
59 (
60 LoadGeneratorOutput::Default,
61 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
62 ),
63 (
64 LoadGeneratorOutput::Default,
65 Event::Progress(Some(MzOffset::from(offset + 1))),
66 ),
67 ];
68 retraction.into_iter().chain(insertion)
69 })
70 .flatten(),
71 )
72 }
73}