1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Aggregate statistics about data stored in persist.

use std::borrow::Cow;
use std::sync::Arc;

use mz_dyncfg::{Config, ConfigSet};
use mz_persist::indexed::columnar::ColumnarRecords;
use mz_persist_types::part::PartBuilder;
use mz_persist_types::stats::PartStats;
use mz_persist_types::Codec;

use crate::batch::UntrimmableColumns;
use crate::internal::encoding::Schemas;
use crate::metrics::Metrics;
use crate::read::LazyPartStats;

use crate::ShardId;

/// Percent of filtered data to opt in to correctness auditing.
pub(crate) const STATS_AUDIT_PERCENT: Config<usize> = Config::new(
    "persist_stats_audit_percent",
    0,
    "Percent of filtered data to opt in to correctness auditing (Materialize).",
);

/// Computes and stores statistics about each batch part.
///
/// These can be used at read time to entirely skip fetching a part based on its
/// statistics. See [STATS_FILTER_ENABLED].
pub(crate) const STATS_COLLECTION_ENABLED: Config<bool> = Config::new(
    "persist_stats_collection_enabled",
    true,
    "\
    Whether to calculate and record statistics about the data stored in \
    persist to be used at read time, see persist_stats_filter_enabled \
    (Materialize).",
);

/// Uses previously computed statistics about batch parts to entirely skip
/// fetching them at read time.
///
/// See `STATS_COLLECTION_ENABLED`.
pub const STATS_FILTER_ENABLED: Config<bool> = Config::new(
    "persist_stats_filter_enabled",
    true,
    "\
    Whether to use recorded statistics about the data stored in persist to \
    filter at read time, see persist_stats_collection_enabled (Materialize).",
);

/// The budget (in bytes) of how many stats to write down per batch part. When
/// the budget is exceeded, stats will be trimmed away according to a variety of
/// heuristics.
pub(crate) const STATS_BUDGET_BYTES: Config<usize> = Config::new(
    "persist_stats_budget_bytes",
    1024,
    "The budget (in bytes) of how many stats to maintain per batch part.",
);

pub(crate) const STATS_UNTRIMMABLE_COLUMNS_EQUALS: Config<fn() -> String> = Config::new(
    "persist_stats_untrimmable_columns_equals",
    || {
        [
            // If we trim the "err" column, then we can't ever use pushdown on a
            // part (because it could have >0 errors).
            "err",
            "ts",
            "receivedat",
            "createdat",
            // Fivetran created tables track deleted rows by setting this column.
            //
            // See <https://fivetran.com/docs/using-fivetran/features#capturedeletes>.
            "_fivetran_deleted",
        ]
        .join(",")
    },
    "\
    Which columns to always retain during persist stats trimming. Any column \
    with a name exactly equal (case-insensitive) to one of these will be kept. \
    Comma separated list.",
);

pub(crate) const STATS_UNTRIMMABLE_COLUMNS_PREFIX: Config<fn() -> String> = Config::new(
    "persist_stats_untrimmable_columns_prefix",
    || ["last_"].join(","),
    "\
    Which columns to always retain during persist stats trimming. Any column \
    with a name starting with (case-insensitive) one of these will be kept. \
    Comma separated list.",
);

pub(crate) const STATS_UNTRIMMABLE_COLUMNS_SUFFIX: Config<fn() -> String> = Config::new(
    "persist_stats_untrimmable_columns_suffix",
    || ["timestamp", "time", "_at", "_tstamp"].join(","),
    "\
    Which columns to always retain during persist stats trimming. Any column \
    with a name ending with (case-insensitive) one of these will be kept. \
    Comma separated list.",
);

pub(crate) fn untrimmable_columns(cfg: &ConfigSet) -> UntrimmableColumns {
    fn split(x: String) -> Vec<Cow<'static, str>> {
        x.split(',')
            .filter(|x| !x.is_empty())
            .map(|x| x.to_owned().into())
            .collect()
    }
    UntrimmableColumns {
        equals: split(STATS_UNTRIMMABLE_COLUMNS_EQUALS.get(cfg)),
        prefixes: split(STATS_UNTRIMMABLE_COLUMNS_PREFIX.get(cfg)),
        suffixes: split(STATS_UNTRIMMABLE_COLUMNS_SUFFIX.get(cfg)),
    }
}

pub(crate) fn part_stats_for_legacy_part<K: Codec, V: Codec>(
    schemas: &Schemas<K, V>,
    part: &[ColumnarRecords],
) -> Result<PartStats, String> {
    // This is a laughably inefficient placeholder implementation of stats
    // on the old part format. We don't intend to make this fast, rather we
    // intend to compute stats on the new part format.
    let mut builder = PartBuilder::new(schemas.key.as_ref(), schemas.val.as_ref())?;
    for x in part {
        for ((k, v), t, d) in x.iter() {
            let k = K::decode(k)?;
            let v = V::decode(v)?;
            let t = i64::from_le_bytes(t);
            let d = i64::from_le_bytes(d);

            builder.push(&k, &v, t, d);
        }
    }
    let part = builder.finish();

    PartStats::new(&part)
}

/// Statistics about the contents of a shard as_of some time.
///
/// TODO: Add more stats here as they become necessary.
#[derive(Debug)]
pub struct SnapshotStats {
    /// The shard these statistics are for.
    pub shard_id: ShardId,
    /// An estimate of the count of updates in the shard.
    ///
    /// This is an upper bound on the number of updates that persist_source
    /// would emit if you snapshot the source at the given as_of. The real
    /// number of updates, after consolidation, might be lower. It includes both
    /// additions and retractions.
    ///
    /// NB: Because of internal persist compaction, the answer for a given as_of
    /// may change over time (as persist advances through Seqnos), but because
    /// compaction never results in more updates than the sum of the inputs, it
    /// can only go down.
    pub num_updates: usize,
}

/// Statistics about the contents of the parts of a shard as_of some time.
#[derive(Debug)]
pub struct SnapshotPartsStats {
    /// Metrics for the persist backing shard, so the caller can report any
    /// necessary counters.
    pub metrics: Arc<Metrics>,
    /// The shard these statistics are for.
    pub shard_id: ShardId,
    /// Stats for individual parts.
    pub parts: Vec<SnapshotPartStats>,
}

/// Part-specific stats.
#[derive(Debug)]
pub struct SnapshotPartStats {
    /// The size of the encoded data in bytes.
    pub encoded_size_bytes: usize,
    /// The raw/encoded statistics for that part, if we have them.
    pub stats: Option<LazyPartStats>,
}