use std::borrow::Cow;
use std::sync::Arc;
use mz_dyncfg::{Config, ConfigSet};
use mz_persist::indexed::columnar::ColumnarRecordsStructuredExt;
use mz_persist::indexed::encoding::BlobTraceUpdates;
use mz_persist_types::columnar::{codec_to_schema2, ColumnDecoder, Schema2};
use mz_persist_types::stats::PartStats;
use mz_persist_types::Codec;
use crate::batch::UntrimmableColumns;
use crate::internal::encoding::Schemas;
use crate::metrics::Metrics;
use crate::read::LazyPartStats;
use crate::ShardId;
pub(crate) const STATS_AUDIT_PERCENT: Config<usize> = Config::new(
"persist_stats_audit_percent",
1,
"Percent of filtered data to opt in to correctness auditing (Materialize).",
);
pub(crate) const STATS_COLLECTION_ENABLED: Config<bool> = Config::new(
"persist_stats_collection_enabled",
true,
"\
Whether to calculate and record statistics about the data stored in \
persist to be used at read time, see persist_stats_filter_enabled \
(Materialize).",
);
pub const STATS_FILTER_ENABLED: Config<bool> = Config::new(
"persist_stats_filter_enabled",
true,
"\
Whether to use recorded statistics about the data stored in persist to \
filter at read time, see persist_stats_collection_enabled (Materialize).",
);
pub(crate) const STATS_BUDGET_BYTES: Config<usize> = Config::new(
"persist_stats_budget_bytes",
1024,
"The budget (in bytes) of how many stats to maintain per batch part.",
);
pub(crate) const STATS_UNTRIMMABLE_COLUMNS_EQUALS: Config<fn() -> String> = Config::new(
"persist_stats_untrimmable_columns_equals",
|| {
[
"err",
"ts",
"receivedat",
"createdat",
"_fivetran_deleted",
]
.join(",")
},
"\
Which columns to always retain during persist stats trimming. Any column \
with a name exactly equal (case-insensitive) to one of these will be kept. \
Comma separated list.",
);
pub(crate) const STATS_UNTRIMMABLE_COLUMNS_PREFIX: Config<fn() -> String> = Config::new(
"persist_stats_untrimmable_columns_prefix",
|| ["last_"].join(","),
"\
Which columns to always retain during persist stats trimming. Any column \
with a name starting with (case-insensitive) one of these will be kept. \
Comma separated list.",
);
pub(crate) const STATS_UNTRIMMABLE_COLUMNS_SUFFIX: Config<fn() -> String> = Config::new(
"persist_stats_untrimmable_columns_suffix",
|| ["timestamp", "time", "_at", "_tstamp"].join(","),
"\
Which columns to always retain during persist stats trimming. Any column \
with a name ending with (case-insensitive) one of these will be kept. \
Comma separated list.",
);
pub(crate) fn untrimmable_columns(cfg: &ConfigSet) -> UntrimmableColumns {
fn split(x: String) -> Vec<Cow<'static, str>> {
x.split(',')
.filter(|x| !x.is_empty())
.map(|x| x.to_owned().into())
.collect()
}
UntrimmableColumns {
equals: split(STATS_UNTRIMMABLE_COLUMNS_EQUALS.get(cfg)),
prefixes: split(STATS_UNTRIMMABLE_COLUMNS_PREFIX.get(cfg)),
suffixes: split(STATS_UNTRIMMABLE_COLUMNS_SUFFIX.get(cfg)),
}
}
pub(crate) fn encode_updates<K, V>(
write_schemas: &Schemas<K, V>,
updates: &BlobTraceUpdates,
) -> Result<(Option<ColumnarRecordsStructuredExt>, PartStats), String>
where
K: Codec,
V: Codec,
{
let records = updates.records();
let ext = match updates.structured() {
Some(ext) => ext.clone(),
None => ColumnarRecordsStructuredExt {
key: codec_to_schema2::<K>(write_schemas.key.as_ref(), records.keys())
.map_err(|e| e.to_string())?,
val: codec_to_schema2::<V>(write_schemas.val.as_ref(), records.vals())
.map_err(|e| e.to_string())?,
},
};
let key_stats = write_schemas
.key
.decoder_any(ext.key.as_ref())
.map_err(|e| e.to_string())?
.stats();
Ok((Some(ext), PartStats { key: key_stats }))
}
#[derive(Debug)]
pub struct SnapshotStats {
pub shard_id: ShardId,
pub num_updates: usize,
}
#[derive(Debug)]
pub struct SnapshotPartsStats {
pub metrics: Arc<Metrics>,
pub shard_id: ShardId,
pub parts: Vec<SnapshotPartStats>,
}
#[derive(Debug)]
pub struct SnapshotPartStats {
pub encoded_size_bytes: usize,
pub stats: Option<LazyPartStats>,
}