1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use mz_ore::now::NowFn;
use mz_repr::{Datum, Row};
use mz_storage_types::sources::load_generator::{Event, Generator};
use mz_storage_types::sources::MzOffset;

pub struct Counter {
    /// How many values will be emitted before old ones are retracted,
    /// or `None` for an append-only collection.  (If this retraction
    /// behavior is changed,
    /// `mz_storage_types::sources::LoadGenerator::is_monotonic`
    /// must be updated.
    pub max_cardinality: Option<u64>,
}

impl Generator for Counter {
    fn by_seed(
        &self,
        _now: NowFn,
        _seed: Option<u64>,
        resume_offset: MzOffset,
    ) -> Box<(dyn Iterator<Item = (usize, Event<Option<MzOffset>, (Row, i64)>)>)> {
        let max_cardinality = self.max_cardinality;

        Box::new(
            (resume_offset.offset..)
                .map(move |offset| {
                    let retraction = match max_cardinality {
                        // At offset `max` we must start retracting the value of `offset - max`. For
                        // example if max_cardinality is 2 then the collection should contain:
                        // (1, 0, +1)
                        // (2, 1, +1)
                        // (1, 2, -1) <- Here offset becomes >= max and we retract the value that was
                        // (3, 2, +1)    emitted at (offset - max), which equals (offset - max + 1).
                        // (2, 3, -1)
                        // (4, 3, +1)
                        Some(max) if offset >= max => {
                            let retracted_value = i64::try_from(offset - max + 1).unwrap();
                            let row = Row::pack_slice(&[Datum::Int64(retracted_value)]);
                            Some((0, Event::Message(MzOffset::from(offset), (row, -1))))
                        }
                        _ => None,
                    };

                    let inserted_value = i64::try_from(offset + 1).unwrap();
                    let row = Row::pack_slice(&[Datum::Int64(inserted_value)]);
                    let insertion = [
                        (0, Event::Message(MzOffset::from(offset), (row, 1))),
                        (0, Event::Progress(Some(MzOffset::from(offset + 1)))),
                    ];
                    retraction.into_iter().chain(insertion)
                })
                .flatten(),
        )
    }
}