mz_persist_client/
stats.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Aggregate statistics about data stored in persist.
11
12use std::borrow::Cow;
13use std::sync::Arc;
14
15use mz_dyncfg::{Config, ConfigSet};
16
17use crate::batch::UntrimmableColumns;
18use crate::metrics::Metrics;
19use crate::read::LazyPartStats;
20
21use crate::ShardId;
22
23/// Percent of filtered data to opt in to correctness auditing.
24pub(crate) const STATS_AUDIT_PERCENT: Config<usize> = Config::new(
25    "persist_stats_audit_percent",
26    1,
27    "Percent of filtered data to opt in to correctness auditing (Materialize).",
28);
29
30/// See description for usage.
31pub const STATS_AUDIT_PANIC: Config<bool> = Config::new(
32    "persist_stats_audit_panic",
33    true,
34    "If set (as it is by default), panic on any auditing failure. If not, report an error but \
35    pass along the data as normal. This should almost certainly be paired with an audit rate of 100%, \
36    so all parts are audited, for consistency.",
37);
38
39/// Computes and stores statistics about each batch part.
40///
41/// These can be used at read time to entirely skip fetching a part based on its
42/// statistics. See [STATS_FILTER_ENABLED].
43pub(crate) const STATS_COLLECTION_ENABLED: Config<bool> = Config::new(
44    "persist_stats_collection_enabled",
45    true,
46    "\
47    Whether to calculate and record statistics about the data stored in \
48    persist to be used at read time, see persist_stats_filter_enabled \
49    (Materialize).",
50);
51
52/// Uses previously computed statistics about batch parts to entirely skip
53/// fetching them at read time.
54///
55/// See `STATS_COLLECTION_ENABLED`.
56pub const STATS_FILTER_ENABLED: Config<bool> = Config::new(
57    "persist_stats_filter_enabled",
58    true,
59    "\
60    Whether to use recorded statistics about the data stored in persist to \
61    filter at read time, see persist_stats_collection_enabled (Materialize).",
62);
63
64/// The budget (in bytes) of how many stats to write down per batch part. When
65/// the budget is exceeded, stats will be trimmed away according to a variety of
66/// heuristics.
67pub(crate) const STATS_BUDGET_BYTES: Config<usize> = Config::new(
68    "persist_stats_budget_bytes",
69    1024,
70    "The budget (in bytes) of how many stats to maintain per batch part.",
71);
72
73pub(crate) const STATS_UNTRIMMABLE_COLUMNS_EQUALS: Config<fn() -> String> = Config::new(
74    "persist_stats_untrimmable_columns_equals",
75    || {
76        [
77            // If we trim the "err" column, then we can't ever use pushdown on a
78            // part (because it could have >0 errors).
79            "err",
80            "ts",
81            "receivedat",
82            "createdat",
83            // Fivetran created tables track deleted rows by setting this column.
84            //
85            // See <https://fivetran.com/docs/using-fivetran/features#capturedeletes>.
86            "_fivetran_deleted",
87        ]
88        .join(",")
89    },
90    "\
91    Which columns to always retain during persist stats trimming. Any column \
92    with a name exactly equal (case-insensitive) to one of these will be kept. \
93    Comma separated list.",
94);
95
96pub(crate) const STATS_UNTRIMMABLE_COLUMNS_PREFIX: Config<fn() -> String> = Config::new(
97    "persist_stats_untrimmable_columns_prefix",
98    || ["last_"].join(","),
99    "\
100    Which columns to always retain during persist stats trimming. Any column \
101    with a name starting with (case-insensitive) one of these will be kept. \
102    Comma separated list.",
103);
104
105pub(crate) const STATS_UNTRIMMABLE_COLUMNS_SUFFIX: Config<fn() -> String> = Config::new(
106    "persist_stats_untrimmable_columns_suffix",
107    || ["timestamp", "time", "_at", "_tstamp"].join(","),
108    "\
109    Which columns to always retain during persist stats trimming. Any column \
110    with a name ending with (case-insensitive) one of these will be kept. \
111    Comma separated list.",
112);
113
114pub(crate) fn untrimmable_columns(cfg: &ConfigSet) -> UntrimmableColumns {
115    fn split(x: String) -> Vec<Cow<'static, str>> {
116        x.split(',')
117            .filter(|x| !x.is_empty())
118            .map(|x| x.to_owned().into())
119            .collect()
120    }
121    UntrimmableColumns {
122        equals: split(STATS_UNTRIMMABLE_COLUMNS_EQUALS.get(cfg)),
123        prefixes: split(STATS_UNTRIMMABLE_COLUMNS_PREFIX.get(cfg)),
124        suffixes: split(STATS_UNTRIMMABLE_COLUMNS_SUFFIX.get(cfg)),
125    }
126}
127
128/// Statistics about the contents of a shard as_of some time.
129///
130/// TODO: Add more stats here as they become necessary.
131#[derive(Debug)]
132pub struct SnapshotStats {
133    /// The shard these statistics are for.
134    pub shard_id: ShardId,
135    /// An estimate of the count of updates in the shard.
136    ///
137    /// This is an upper bound on the number of updates that persist_source
138    /// would emit if you snapshot the source at the given as_of. The real
139    /// number of updates, after consolidation, might be lower. It includes both
140    /// additions and retractions.
141    ///
142    /// NB: Because of internal persist compaction, the answer for a given as_of
143    /// may change over time (as persist advances through Seqnos), but because
144    /// compaction never results in more updates than the sum of the inputs, it
145    /// can only go down.
146    pub num_updates: usize,
147}
148
149/// Statistics about the contents of the parts of a shard as_of some time.
150#[derive(Debug)]
151pub struct SnapshotPartsStats {
152    /// Metrics for the persist backing shard, so the caller can report any
153    /// necessary counters.
154    pub metrics: Arc<Metrics>,
155    /// The shard these statistics are for.
156    pub shard_id: ShardId,
157    /// Stats for individual parts.
158    pub parts: Vec<SnapshotPartStats>,
159}
160
161/// Part-specific stats.
162#[derive(Debug)]
163pub struct SnapshotPartStats {
164    /// The size of the encoded data in bytes.
165    pub encoded_size_bytes: usize,
166    /// The raw/encoded statistics for that part, if we have them.
167    pub stats: Option<LazyPartStats>,
168}