1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Aggregate statistics about data stored in persist.
1112use std::borrow::Cow;
13use std::sync::Arc;
1415use mz_dyncfg::{Config, ConfigSet};
1617use crate::batch::UntrimmableColumns;
18use crate::metrics::Metrics;
19use crate::read::LazyPartStats;
2021use crate::ShardId;
2223/// Percent of filtered data to opt in to correctness auditing.
24pub(crate) const STATS_AUDIT_PERCENT: Config<usize> = Config::new(
25"persist_stats_audit_percent",
261,
27"Percent of filtered data to opt in to correctness auditing (Materialize).",
28);
2930/// Computes and stores statistics about each batch part.
31///
32/// These can be used at read time to entirely skip fetching a part based on its
33/// statistics. See [STATS_FILTER_ENABLED].
34pub(crate) const STATS_COLLECTION_ENABLED: Config<bool> = Config::new(
35"persist_stats_collection_enabled",
36true,
37"\
38 Whether to calculate and record statistics about the data stored in \
39 persist to be used at read time, see persist_stats_filter_enabled \
40 (Materialize).",
41);
4243/// Uses previously computed statistics about batch parts to entirely skip
44/// fetching them at read time.
45///
46/// See `STATS_COLLECTION_ENABLED`.
47pub const STATS_FILTER_ENABLED: Config<bool> = Config::new(
48"persist_stats_filter_enabled",
49true,
50"\
51 Whether to use recorded statistics about the data stored in persist to \
52 filter at read time, see persist_stats_collection_enabled (Materialize).",
53);
5455/// The budget (in bytes) of how many stats to write down per batch part. When
56/// the budget is exceeded, stats will be trimmed away according to a variety of
57/// heuristics.
58pub(crate) const STATS_BUDGET_BYTES: Config<usize> = Config::new(
59"persist_stats_budget_bytes",
601024,
61"The budget (in bytes) of how many stats to maintain per batch part.",
62);
6364pub(crate) const STATS_UNTRIMMABLE_COLUMNS_EQUALS: Config<fn() -> String> = Config::new(
65"persist_stats_untrimmable_columns_equals",
66 || {
67 [
68// If we trim the "err" column, then we can't ever use pushdown on a
69 // part (because it could have >0 errors).
70"err",
71"ts",
72"receivedat",
73"createdat",
74// Fivetran created tables track deleted rows by setting this column.
75 //
76 // See <https://fivetran.com/docs/using-fivetran/features#capturedeletes>.
77"_fivetran_deleted",
78 ]
79 .join(",")
80 },
81"\
82 Which columns to always retain during persist stats trimming. Any column \
83 with a name exactly equal (case-insensitive) to one of these will be kept. \
84 Comma separated list.",
85);
8687pub(crate) const STATS_UNTRIMMABLE_COLUMNS_PREFIX: Config<fn() -> String> = Config::new(
88"persist_stats_untrimmable_columns_prefix",
89 || ["last_"].join(","),
90"\
91 Which columns to always retain during persist stats trimming. Any column \
92 with a name starting with (case-insensitive) one of these will be kept. \
93 Comma separated list.",
94);
9596pub(crate) const STATS_UNTRIMMABLE_COLUMNS_SUFFIX: Config<fn() -> String> = Config::new(
97"persist_stats_untrimmable_columns_suffix",
98 || ["timestamp", "time", "_at", "_tstamp"].join(","),
99"\
100 Which columns to always retain during persist stats trimming. Any column \
101 with a name ending with (case-insensitive) one of these will be kept. \
102 Comma separated list.",
103);
104105pub(crate) fn untrimmable_columns(cfg: &ConfigSet) -> UntrimmableColumns {
106fn split(x: String) -> Vec<Cow<'static, str>> {
107 x.split(',')
108 .filter(|x| !x.is_empty())
109 .map(|x| x.to_owned().into())
110 .collect()
111 }
112 UntrimmableColumns {
113 equals: split(STATS_UNTRIMMABLE_COLUMNS_EQUALS.get(cfg)),
114 prefixes: split(STATS_UNTRIMMABLE_COLUMNS_PREFIX.get(cfg)),
115 suffixes: split(STATS_UNTRIMMABLE_COLUMNS_SUFFIX.get(cfg)),
116 }
117}
118119/// Statistics about the contents of a shard as_of some time.
120///
121/// TODO: Add more stats here as they become necessary.
122#[derive(Debug)]
123pub struct SnapshotStats {
124/// The shard these statistics are for.
125pub shard_id: ShardId,
126/// An estimate of the count of updates in the shard.
127 ///
128 /// This is an upper bound on the number of updates that persist_source
129 /// would emit if you snapshot the source at the given as_of. The real
130 /// number of updates, after consolidation, might be lower. It includes both
131 /// additions and retractions.
132 ///
133 /// NB: Because of internal persist compaction, the answer for a given as_of
134 /// may change over time (as persist advances through Seqnos), but because
135 /// compaction never results in more updates than the sum of the inputs, it
136 /// can only go down.
137pub num_updates: usize,
138}
139140/// Statistics about the contents of the parts of a shard as_of some time.
141#[derive(Debug)]
142pub struct SnapshotPartsStats {
143/// Metrics for the persist backing shard, so the caller can report any
144 /// necessary counters.
145pub metrics: Arc<Metrics>,
146/// The shard these statistics are for.
147pub shard_id: ShardId,
148/// Stats for individual parts.
149pub parts: Vec<SnapshotPartStats>,
150}
151152/// Part-specific stats.
153#[derive(Debug)]
154pub struct SnapshotPartStats {
155/// The size of the encoded data in bytes.
156pub encoded_size_bytes: usize,
157/// The raw/encoded statistics for that part, if we have them.
158pub stats: Option<LazyPartStats>,
159}