mz_storage/source/generator/
counter.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::now::NowFn;
11use mz_repr::{Datum, Diff, Row};
12use mz_storage_types::sources::MzOffset;
13use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
14
15pub struct Counter {
16    /// How many values will be emitted before old ones are retracted,
17    /// or `None` for an append-only collection.  (If this retraction
18    /// behavior is changed,
19    /// `mz_storage_types::sources::LoadGenerator::is_monotonic`
20    /// must be updated.
21    pub max_cardinality: Option<u64>,
22}
23
24impl Generator for Counter {
25    fn by_seed(
26        &self,
27        _now: NowFn,
28        _seed: Option<u64>,
29        resume_offset: MzOffset,
30    ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>> {
31        let max_cardinality = self.max_cardinality;
32
33        Box::new(
34            (resume_offset.offset..)
35                .map(move |offset| {
36                    let retraction = match max_cardinality {
37                        // At offset `max` we must start retracting the value of `offset - max`. For
38                        // example if max_cardinality is 2 then the collection should contain:
39                        // (1, 0, +1)
40                        // (2, 1, +1)
41                        // (1, 2, -1) <- Here offset becomes >= max and we retract the value that was
42                        // (3, 2, +1)    emitted at (offset - max), which equals (offset - max + 1).
43                        // (2, 3, -1)
44                        // (4, 3, +1)
45                        Some(max) if offset >= max => {
46                            let retracted_value = i64::try_from(offset - max + 1).unwrap();
47                            let row = Row::pack_slice(&[Datum::Int64(retracted_value)]);
48                            Some((
49                                LoadGeneratorOutput::Default,
50                                Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
51                            ))
52                        }
53                        _ => None,
54                    };
55
56                    let inserted_value = i64::try_from(offset + 1).unwrap();
57                    let row = Row::pack_slice(&[Datum::Int64(inserted_value)]);
58                    let insertion = [
59                        (
60                            LoadGeneratorOutput::Default,
61                            Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
62                        ),
63                        (
64                            LoadGeneratorOutput::Default,
65                            Event::Progress(Some(MzOffset::from(offset + 1))),
66                        ),
67                    ];
68                    retraction.into_iter().chain(insertion)
69                })
70                .flatten(),
71        )
72    }
73}