mz_persist_client/stats.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Aggregate statistics about data stored in persist.
use std::borrow::Cow;
use std::sync::Arc;
use mz_dyncfg::{Config, ConfigSet};
use crate::batch::UntrimmableColumns;
use crate::metrics::Metrics;
use crate::read::LazyPartStats;
use crate::ShardId;
/// Percent of filtered data to opt in to correctness auditing.
pub(crate) const STATS_AUDIT_PERCENT: Config<usize> = Config::new(
"persist_stats_audit_percent",
1,
"Percent of filtered data to opt in to correctness auditing (Materialize).",
);
/// Computes and stores statistics about each batch part.
///
/// These can be used at read time to entirely skip fetching a part based on its
/// statistics. See [STATS_FILTER_ENABLED].
pub(crate) const STATS_COLLECTION_ENABLED: Config<bool> = Config::new(
"persist_stats_collection_enabled",
true,
"\
Whether to calculate and record statistics about the data stored in \
persist to be used at read time, see persist_stats_filter_enabled \
(Materialize).",
);
/// Uses previously computed statistics about batch parts to entirely skip
/// fetching them at read time.
///
/// See `STATS_COLLECTION_ENABLED`.
pub const STATS_FILTER_ENABLED: Config<bool> = Config::new(
"persist_stats_filter_enabled",
true,
"\
Whether to use recorded statistics about the data stored in persist to \
filter at read time, see persist_stats_collection_enabled (Materialize).",
);
/// The budget (in bytes) of how many stats to write down per batch part. When
/// the budget is exceeded, stats will be trimmed away according to a variety of
/// heuristics.
pub(crate) const STATS_BUDGET_BYTES: Config<usize> = Config::new(
"persist_stats_budget_bytes",
1024,
"The budget (in bytes) of how many stats to maintain per batch part.",
);
pub(crate) const STATS_UNTRIMMABLE_COLUMNS_EQUALS: Config<fn() -> String> = Config::new(
"persist_stats_untrimmable_columns_equals",
|| {
[
// If we trim the "err" column, then we can't ever use pushdown on a
// part (because it could have >0 errors).
"err",
"ts",
"receivedat",
"createdat",
// Fivetran created tables track deleted rows by setting this column.
//
// See <https://fivetran.com/docs/using-fivetran/features#capturedeletes>.
"_fivetran_deleted",
]
.join(",")
},
"\
Which columns to always retain during persist stats trimming. Any column \
with a name exactly equal (case-insensitive) to one of these will be kept. \
Comma separated list.",
);
pub(crate) const STATS_UNTRIMMABLE_COLUMNS_PREFIX: Config<fn() -> String> = Config::new(
"persist_stats_untrimmable_columns_prefix",
|| ["last_"].join(","),
"\
Which columns to always retain during persist stats trimming. Any column \
with a name starting with (case-insensitive) one of these will be kept. \
Comma separated list.",
);
pub(crate) const STATS_UNTRIMMABLE_COLUMNS_SUFFIX: Config<fn() -> String> = Config::new(
"persist_stats_untrimmable_columns_suffix",
|| ["timestamp", "time", "_at", "_tstamp"].join(","),
"\
Which columns to always retain during persist stats trimming. Any column \
with a name ending with (case-insensitive) one of these will be kept. \
Comma separated list.",
);
pub(crate) fn untrimmable_columns(cfg: &ConfigSet) -> UntrimmableColumns {
fn split(x: String) -> Vec<Cow<'static, str>> {
x.split(',')
.filter(|x| !x.is_empty())
.map(|x| x.to_owned().into())
.collect()
}
UntrimmableColumns {
equals: split(STATS_UNTRIMMABLE_COLUMNS_EQUALS.get(cfg)),
prefixes: split(STATS_UNTRIMMABLE_COLUMNS_PREFIX.get(cfg)),
suffixes: split(STATS_UNTRIMMABLE_COLUMNS_SUFFIX.get(cfg)),
}
}
/// Statistics about the contents of a shard as_of some time.
///
/// TODO: Add more stats here as they become necessary.
#[derive(Debug)]
pub struct SnapshotStats {
/// The shard these statistics are for.
pub shard_id: ShardId,
/// An estimate of the count of updates in the shard.
///
/// This is an upper bound on the number of updates that persist_source
/// would emit if you snapshot the source at the given as_of. The real
/// number of updates, after consolidation, might be lower. It includes both
/// additions and retractions.
///
/// NB: Because of internal persist compaction, the answer for a given as_of
/// may change over time (as persist advances through Seqnos), but because
/// compaction never results in more updates than the sum of the inputs, it
/// can only go down.
pub num_updates: usize,
}
/// Statistics about the contents of the parts of a shard as_of some time.
#[derive(Debug)]
pub struct SnapshotPartsStats {
/// Metrics for the persist backing shard, so the caller can report any
/// necessary counters.
pub metrics: Arc<Metrics>,
/// The shard these statistics are for.
pub shard_id: ShardId,
/// Stats for individual parts.
pub parts: Vec<SnapshotPartStats>,
}
/// Part-specific stats.
#[derive(Debug)]
pub struct SnapshotPartStats {
/// The size of the encoded data in bytes.
pub encoded_size_bytes: usize,
/// The raw/encoded statistics for that part, if we have them.
pub stats: Option<LazyPartStats>,
}