mz_storage/source/generator/counter.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::now::NowFn;
11use mz_repr::{Datum, Diff, Row};
12use mz_storage_types::sources::MzOffset;
13use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
14
15pub struct Counter {
16 /// How many values will be emitted before old ones are retracted,
17 /// or `None` for an append-only collection. (If this retraction
18 /// behavior is changed,
19 /// `mz_storage_types::sources::LoadGenerator::is_monotonic`
20 /// must be updated.
21 pub max_cardinality: Option<u64>,
22}
23
24impl Generator for Counter {
25 fn by_seed(
26 &self,
27 _now: NowFn,
28 _seed: Option<u64>,
29 resume_offset: MzOffset,
30 ) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>)>
31 {
32 let max_cardinality = self.max_cardinality;
33
34 Box::new(
35 (resume_offset.offset..)
36 .map(move |offset| {
37 let retraction = match max_cardinality {
38 // At offset `max` we must start retracting the value of `offset - max`. For
39 // example if max_cardinality is 2 then the collection should contain:
40 // (1, 0, +1)
41 // (2, 1, +1)
42 // (1, 2, -1) <- Here offset becomes >= max and we retract the value that was
43 // (3, 2, +1) emitted at (offset - max), which equals (offset - max + 1).
44 // (2, 3, -1)
45 // (4, 3, +1)
46 Some(max) if offset >= max => {
47 let retracted_value = i64::try_from(offset - max + 1).unwrap();
48 let row = Row::pack_slice(&[Datum::Int64(retracted_value)]);
49 Some((
50 LoadGeneratorOutput::Default,
51 Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
52 ))
53 }
54 _ => None,
55 };
56
57 let inserted_value = i64::try_from(offset + 1).unwrap();
58 let row = Row::pack_slice(&[Datum::Int64(inserted_value)]);
59 let insertion = [
60 (
61 LoadGeneratorOutput::Default,
62 Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
63 ),
64 (
65 LoadGeneratorOutput::Default,
66 Event::Progress(Some(MzOffset::from(offset + 1))),
67 ),
68 ];
69 retraction.into_iter().chain(insertion)
70 })
71 .flatten(),
72 )
73 }
74}