mz_storage/source/generator/
counter.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::now::NowFn;
11use mz_repr::{Datum, Diff, Row};
12use mz_storage_types::sources::MzOffset;
13use mz_storage_types::sources::load_generator::{Event, Generator, LoadGeneratorOutput};
14
15pub struct Counter {
16    /// How many values will be emitted before old ones are retracted,
17    /// or `None` for an append-only collection.  (If this retraction
18    /// behavior is changed,
19    /// `mz_storage_types::sources::LoadGenerator::is_monotonic`
20    /// must be updated.
21    pub max_cardinality: Option<u64>,
22}
23
24impl Generator for Counter {
25    fn by_seed(
26        &self,
27        _now: NowFn,
28        _seed: Option<u64>,
29        resume_offset: MzOffset,
30    ) -> Box<(dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>)>
31    {
32        let max_cardinality = self.max_cardinality;
33
34        Box::new(
35            (resume_offset.offset..)
36                .map(move |offset| {
37                    let retraction = match max_cardinality {
38                        // At offset `max` we must start retracting the value of `offset - max`. For
39                        // example if max_cardinality is 2 then the collection should contain:
40                        // (1, 0, +1)
41                        // (2, 1, +1)
42                        // (1, 2, -1) <- Here offset becomes >= max and we retract the value that was
43                        // (3, 2, +1)    emitted at (offset - max), which equals (offset - max + 1).
44                        // (2, 3, -1)
45                        // (4, 3, +1)
46                        Some(max) if offset >= max => {
47                            let retracted_value = i64::try_from(offset - max + 1).unwrap();
48                            let row = Row::pack_slice(&[Datum::Int64(retracted_value)]);
49                            Some((
50                                LoadGeneratorOutput::Default,
51                                Event::Message(MzOffset::from(offset), (row, Diff::MINUS_ONE)),
52                            ))
53                        }
54                        _ => None,
55                    };
56
57                    let inserted_value = i64::try_from(offset + 1).unwrap();
58                    let row = Row::pack_slice(&[Datum::Int64(inserted_value)]);
59                    let insertion = [
60                        (
61                            LoadGeneratorOutput::Default,
62                            Event::Message(MzOffset::from(offset), (row, Diff::ONE)),
63                        ),
64                        (
65                            LoadGeneratorOutput::Default,
66                            Event::Progress(Some(MzOffset::from(offset + 1))),
67                        ),
68                    ];
69                    retraction.into_iter().chain(insertion)
70                })
71                .flatten(),
72        )
73    }
74}