use async_stream::stream;
use mz_persist_types::stats::PartStatsMetrics;
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use tokio::sync::{OnceCell, OwnedSemaphorePermit, Semaphore};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::StreamExt;
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::{CastFrom, CastLossy};
use mz_ore::instrument;
use mz_ore::metric;
use mz_ore::metrics::{
raw, ComputedGauge, ComputedIntGauge, ComputedUIntGauge, Counter, DeleteOnDropCounter,
DeleteOnDropGauge, IntCounter, MakeCollector, MetricVecExt, MetricsRegistry, UIntGauge,
UIntGaugeVec,
};
use mz_ore::stats::histogram_seconds_buckets;
use mz_persist::location::{
Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData,
};
use mz_persist::metrics::{ColumnarMetrics, S3BlobMetrics};
use mz_persist::retry::RetryStream;
use mz_persist_types::Codec64;
use mz_postgres_client::metrics::PostgresClientMetrics;
use prometheus::core::{AtomicI64, AtomicU64, Collector, Desc, GenericGauge};
use prometheus::proto::MetricFamily;
use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec};
use timely::progress::Antichain;
use tokio_metrics::TaskMonitor;
use tracing::{debug, info, info_span, Instrument};
use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT};
use crate::internal::paths::BlobKey;
use crate::{PersistConfig, ShardId};
pub struct Metrics {
_vecs: MetricsVecs,
_uptime: ComputedGauge,
pub blob: BlobMetrics,
pub consensus: ConsensusMetrics,
pub cmds: CmdsMetrics,
pub retries: RetriesMetrics,
pub user: BatchWriteMetrics,
pub read: BatchPartReadMetrics,
pub compaction: CompactionMetrics,
pub gc: GcMetrics,
pub lease: LeaseMetrics,
pub codecs: CodecsMetrics,
pub state: StateMetrics,
pub shards: ShardsMetrics,
pub audit: UsageAuditMetrics,
pub locks: LocksMetrics,
pub watch: WatchMetrics,
pub pubsub_client: PubSubClientMetrics,
pub pushdown: PushdownMetrics,
pub consolidation: ConsolidationMetrics,
pub blob_cache_mem: BlobMemCache,
pub tasks: TasksMetrics,
pub columnar: ColumnarMetrics,
pub schema: SchemaMetrics,
pub inline: InlineMetrics,
pub(crate) semaphore: SemaphoreMetrics,
pub sink: SinkMetrics,
pub s3_blob: S3BlobMetrics,
pub postgres_consensus: PostgresClientMetrics,
#[allow(dead_code)]
pub(crate) registry: MetricsRegistry,
}
impl std::fmt::Debug for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Metrics").finish_non_exhaustive()
}
}
impl Metrics {
pub fn new(cfg: &PersistConfig, registry: &MetricsRegistry) -> Self {
let vecs = MetricsVecs::new(registry);
let start = Instant::now();
let uptime = registry.register_computed_gauge(
metric!(
name: "mz_persist_metadata_seconds",
help: "server uptime, labels are build metadata",
const_labels: {
"version" => cfg.build_version,
"build_type" => if cfg!(release) { "release" } else { "debug" }
},
),
move || start.elapsed().as_secs_f64(),
);
let s3_blob = S3BlobMetrics::new(registry);
let columnar = ColumnarMetrics::new(
registry,
&s3_blob.lgbytes,
Arc::clone(&cfg.configs),
cfg.is_cc_active,
);
Metrics {
blob: vecs.blob_metrics(),
consensus: vecs.consensus_metrics(),
cmds: vecs.cmds_metrics(registry),
retries: vecs.retries_metrics(),
codecs: vecs.codecs_metrics(),
user: BatchWriteMetrics::new(registry, "user"),
read: vecs.batch_part_read_metrics(),
compaction: CompactionMetrics::new(registry),
gc: GcMetrics::new(registry),
lease: LeaseMetrics::new(registry),
state: StateMetrics::new(registry),
shards: ShardsMetrics::new(registry),
audit: UsageAuditMetrics::new(registry),
locks: vecs.locks_metrics(),
watch: WatchMetrics::new(registry),
pubsub_client: PubSubClientMetrics::new(registry),
pushdown: PushdownMetrics::new(registry),
consolidation: ConsolidationMetrics::new(registry),
blob_cache_mem: BlobMemCache::new(registry),
tasks: TasksMetrics::new(registry),
columnar,
schema: SchemaMetrics::new(registry),
inline: InlineMetrics::new(registry),
semaphore: SemaphoreMetrics::new(cfg.clone(), registry.clone()),
sink: SinkMetrics::new(registry),
s3_blob,
postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"),
_vecs: vecs,
_uptime: uptime,
registry: registry.clone(),
}
}
pub fn write_amplification(&self) -> f64 {
let total_written = self.blob.set.bytes.get();
let user_written = self.user.goodbytes.get();
#[allow(clippy::as_conversions)]
{
total_written as f64 / user_written as f64
}
}
}
#[derive(Debug)]
struct MetricsVecs {
cmd_started: IntCounterVec,
cmd_cas_mismatch: IntCounterVec,
cmd_succeeded: IntCounterVec,
cmd_failed: IntCounterVec,
cmd_seconds: CounterVec,
external_op_started: IntCounterVec,
external_op_succeeded: IntCounterVec,
external_op_failed: IntCounterVec,
external_op_bytes: IntCounterVec,
external_op_seconds: CounterVec,
external_consensus_truncated_count: IntCounter,
external_blob_delete_noop_count: IntCounter,
external_blob_sizes: Histogram,
external_rtt_latency: GaugeVec,
external_op_latency: HistogramVec,
retry_started: IntCounterVec,
retry_finished: IntCounterVec,
retry_retries: IntCounterVec,
retry_sleep_seconds: CounterVec,
encode_count: IntCounterVec,
encode_seconds: CounterVec,
decode_count: IntCounterVec,
decode_seconds: CounterVec,
read_part_bytes: IntCounterVec,
read_part_goodbytes: IntCounterVec,
read_part_count: IntCounterVec,
read_part_seconds: CounterVec,
read_ts_rewrite: IntCounterVec,
lock_acquire_count: IntCounterVec,
lock_blocking_acquire_count: IntCounterVec,
lock_blocking_seconds: CounterVec,
alerts_metrics: Arc<AlertsMetrics>,
}
impl MetricsVecs {
fn new(registry: &MetricsRegistry) -> Self {
MetricsVecs {
cmd_started: registry.register(metric!(
name: "mz_persist_cmd_started_count",
help: "count of commands started",
var_labels: ["cmd"],
)),
cmd_cas_mismatch: registry.register(metric!(
name: "mz_persist_cmd_cas_mismatch_count",
help: "count of command retries from CaS mismatch",
var_labels: ["cmd"],
)),
cmd_succeeded: registry.register(metric!(
name: "mz_persist_cmd_succeeded_count",
help: "count of commands succeeded",
var_labels: ["cmd"],
)),
cmd_failed: registry.register(metric!(
name: "mz_persist_cmd_failed_count",
help: "count of commands failed",
var_labels: ["cmd"],
)),
cmd_seconds: registry.register(metric!(
name: "mz_persist_cmd_seconds",
help: "time spent applying commands",
var_labels: ["cmd"],
)),
external_op_started: registry.register(metric!(
name: "mz_persist_external_started_count",
help: "count of external service calls started",
var_labels: ["op"],
)),
external_op_succeeded: registry.register(metric!(
name: "mz_persist_external_succeeded_count",
help: "count of external service calls succeeded",
var_labels: ["op"],
)),
external_op_failed: registry.register(metric!(
name: "mz_persist_external_failed_count",
help: "count of external service calls failed",
var_labels: ["op"],
)),
external_op_bytes: registry.register(metric!(
name: "mz_persist_external_bytes_count",
help: "total size represented by external service calls",
var_labels: ["op"],
)),
external_op_seconds: registry.register(metric!(
name: "mz_persist_external_seconds",
help: "time spent in external service calls",
var_labels: ["op"],
)),
external_consensus_truncated_count: registry.register(metric!(
name: "mz_persist_external_consensus_truncated_count",
help: "count of versions deleted by consensus truncate calls",
)),
external_blob_delete_noop_count: registry.register(metric!(
name: "mz_persist_external_blob_delete_noop_count",
help: "count of blob delete calls that deleted a non-existent key",
)),
external_blob_sizes: registry.register(metric!(
name: "mz_persist_external_blob_sizes",
help: "histogram of blob sizes at put time",
buckets: mz_ore::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
)),
external_rtt_latency: registry.register(metric!(
name: "mz_persist_external_rtt_latency",
help: "roundtrip-time to external service as seen by this process",
var_labels: ["external"],
)),
external_op_latency: registry.register(metric!(
name: "mz_persist_external_op_latency",
help: "rountrip latency observed by individual performance-critical operations",
var_labels: ["op"],
buckets: histogram_seconds_buckets(0.000_500, 32.0),
)),
retry_started: registry.register(metric!(
name: "mz_persist_retry_started_count",
help: "count of retry loops started",
var_labels: ["op"],
)),
retry_finished: registry.register(metric!(
name: "mz_persist_retry_finished_count",
help: "count of retry loops finished",
var_labels: ["op"],
)),
retry_retries: registry.register(metric!(
name: "mz_persist_retry_retries_count",
help: "count of total attempts by retry loops",
var_labels: ["op"],
)),
retry_sleep_seconds: registry.register(metric!(
name: "mz_persist_retry_sleep_seconds",
help: "time spent in retry loop backoff",
var_labels: ["op"],
)),
encode_count: registry.register(metric!(
name: "mz_persist_encode_count",
help: "count of op encodes",
var_labels: ["op"],
)),
encode_seconds: registry.register(metric!(
name: "mz_persist_encode_seconds",
help: "time spent in op encodes",
var_labels: ["op"],
)),
decode_count: registry.register(metric!(
name: "mz_persist_decode_count",
help: "count of op decodes",
var_labels: ["op"],
)),
decode_seconds: registry.register(metric!(
name: "mz_persist_decode_seconds",
help: "time spent in op decodes",
var_labels: ["op"],
)),
read_part_bytes: registry.register(metric!(
name: "mz_persist_read_batch_part_bytes",
help: "total encoded size of batch parts read",
var_labels: ["op"],
)),
read_part_goodbytes: registry.register(metric!(
name: "mz_persist_read_batch_part_goodbytes",
help: "total logical size of batch parts read",
var_labels: ["op"],
)),
read_part_count: registry.register(metric!(
name: "mz_persist_read_batch_part_count",
help: "count of batch parts read",
var_labels: ["op"],
)),
read_part_seconds: registry.register(metric!(
name: "mz_persist_read_batch_part_seconds",
help: "time spent reading batch parts",
var_labels: ["op"],
)),
read_ts_rewrite: registry.register(metric!(
name: "mz_persist_read_ts_rewite",
help: "count of updates read with rewritten ts",
var_labels: ["op"],
)),
lock_acquire_count: registry.register(metric!(
name: "mz_persist_lock_acquire_count",
help: "count of locks acquired",
var_labels: ["op"],
)),
lock_blocking_acquire_count: registry.register(metric!(
name: "mz_persist_lock_blocking_acquire_count",
help: "count of locks acquired that required blocking",
var_labels: ["op"],
)),
lock_blocking_seconds: registry.register(metric!(
name: "mz_persist_lock_blocking_seconds",
help: "time spent blocked for a lock",
var_labels: ["op"],
)),
alerts_metrics: Arc::new(AlertsMetrics::new(registry)),
}
}
fn cmds_metrics(&self, registry: &MetricsRegistry) -> CmdsMetrics {
CmdsMetrics {
init_state: self.cmd_metrics("init_state"),
add_rollup: self.cmd_metrics("add_rollup"),
remove_rollups: self.cmd_metrics("remove_rollups"),
register: self.cmd_metrics("register"),
compare_and_append: self.cmd_metrics("compare_and_append"),
compare_and_append_noop: registry.register(metric!(
name: "mz_persist_cmd_compare_and_append_noop",
help: "count of compare_and_append retries that were discoverd to have already committed",
)),
compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"),
downgrade_since: self.cmd_metrics("downgrade_since"),
heartbeat_reader: self.cmd_metrics("heartbeat_reader"),
expire_reader: self.cmd_metrics("expire_reader"),
expire_writer: self.cmd_metrics("expire_writer"),
merge_res: self.cmd_metrics("merge_res"),
become_tombstone: self.cmd_metrics("become_tombstone"),
compare_and_evolve_schema: self.cmd_metrics("compare_and_evolve_schema"),
spine_exert: self.cmd_metrics("spine_exert"),
fetch_upper_count: registry.register(metric!(
name: "mz_persist_cmd_fetch_upper_count",
help: "count of fetch_upper calls",
))
}
}
fn cmd_metrics(&self, cmd: &str) -> CmdMetrics {
CmdMetrics {
name: cmd.to_owned(),
started: self.cmd_started.with_label_values(&[cmd]),
succeeded: self.cmd_succeeded.with_label_values(&[cmd]),
cas_mismatch: self.cmd_cas_mismatch.with_label_values(&[cmd]),
failed: self.cmd_failed.with_label_values(&[cmd]),
seconds: self.cmd_seconds.with_label_values(&[cmd]),
}
}
fn retries_metrics(&self) -> RetriesMetrics {
RetriesMetrics {
determinate: RetryDeterminate {
apply_unbatched_cmd_cas: self.retry_metrics("apply_unbatched_cmd::cas"),
},
external: RetryExternal {
batch_delete: Arc::new(self.retry_metrics("batch::delete")),
batch_set: self.retry_metrics("batch::set"),
blob_open: self.retry_metrics("blob::open"),
compaction_noop_delete: Arc::new(self.retry_metrics("compaction_noop::delete")),
consensus_open: self.retry_metrics("consensus::open"),
fetch_batch_get: self.retry_metrics("fetch_batch::get"),
fetch_state_scan: self.retry_metrics("fetch_state::scan"),
gc_truncate: self.retry_metrics("gc::truncate"),
maybe_init_cas: self.retry_metrics("maybe_init::cas"),
rollup_delete: self.retry_metrics("rollup::delete"),
rollup_get: self.retry_metrics("rollup::get"),
rollup_set: self.retry_metrics("rollup::set"),
hollow_run_get: self.retry_metrics("hollow_run::get"),
hollow_run_set: self.retry_metrics("hollow_run::set"),
storage_usage_shard_size: self.retry_metrics("storage_usage::shard_size"),
},
compare_and_append_idempotent: self.retry_metrics("compare_and_append_idempotent"),
fetch_latest_state: self.retry_metrics("fetch_latest_state"),
fetch_live_states: self.retry_metrics("fetch_live_states"),
idempotent_cmd: self.retry_metrics("idempotent_cmd"),
next_listen_batch: self.retry_metrics("next_listen_batch"),
snapshot: self.retry_metrics("snapshot"),
}
}
fn retry_metrics(&self, name: &str) -> RetryMetrics {
RetryMetrics {
name: name.to_owned(),
started: self.retry_started.with_label_values(&[name]),
finished: self.retry_finished.with_label_values(&[name]),
retries: self.retry_retries.with_label_values(&[name]),
sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
}
}
fn codecs_metrics(&self) -> CodecsMetrics {
CodecsMetrics {
state: self.codec_metrics("state"),
state_diff: self.codec_metrics("state_diff"),
batch: self.codec_metrics("batch"),
key: self.codec_metrics("key"),
val: self.codec_metrics("val"),
}
}
fn codec_metrics(&self, op: &str) -> CodecMetrics {
CodecMetrics {
encode_count: self.encode_count.with_label_values(&[op]),
encode_seconds: self.encode_seconds.with_label_values(&[op]),
decode_count: self.decode_count.with_label_values(&[op]),
decode_seconds: self.decode_seconds.with_label_values(&[op]),
}
}
fn blob_metrics(&self) -> BlobMetrics {
BlobMetrics {
set: self.external_op_metrics("blob_set", true),
get: self.external_op_metrics("blob_get", true),
list_keys: self.external_op_metrics("blob_list_keys", false),
delete: self.external_op_metrics("blob_delete", false),
restore: self.external_op_metrics("restore", false),
delete_noop: self.external_blob_delete_noop_count.clone(),
blob_sizes: self.external_blob_sizes.clone(),
rtt_latency: self.external_rtt_latency.with_label_values(&["blob"]),
}
}
fn consensus_metrics(&self) -> ConsensusMetrics {
ConsensusMetrics {
list_keys: self.external_op_metrics("consensus_list_keys", false),
head: self.external_op_metrics("consensus_head", false),
compare_and_set: self.external_op_metrics("consensus_cas", true),
scan: self.external_op_metrics("consensus_scan", false),
truncate: self.external_op_metrics("consensus_truncate", false),
truncated_count: self.external_consensus_truncated_count.clone(),
rtt_latency: self.external_rtt_latency.with_label_values(&["consensus"]),
}
}
fn external_op_metrics(&self, op: &str, latency_histogram: bool) -> ExternalOpMetrics {
ExternalOpMetrics {
started: self.external_op_started.with_label_values(&[op]),
succeeded: self.external_op_succeeded.with_label_values(&[op]),
failed: self.external_op_failed.with_label_values(&[op]),
bytes: self.external_op_bytes.with_label_values(&[op]),
seconds: self.external_op_seconds.with_label_values(&[op]),
seconds_histogram: if latency_histogram {
Some(self.external_op_latency.with_label_values(&[op]))
} else {
None
},
alerts_metrics: Arc::clone(&self.alerts_metrics),
}
}
fn batch_part_read_metrics(&self) -> BatchPartReadMetrics {
BatchPartReadMetrics {
listen: self.read_metrics("listen"),
snapshot: self.read_metrics("snapshot"),
batch_fetcher: self.read_metrics("batch_fetcher"),
compaction: self.read_metrics("compaction"),
unindexed: self.read_metrics("unindexed"),
}
}
fn read_metrics(&self, op: &str) -> ReadMetrics {
ReadMetrics {
part_bytes: self.read_part_bytes.with_label_values(&[op]),
part_goodbytes: self.read_part_goodbytes.with_label_values(&[op]),
part_count: self.read_part_count.with_label_values(&[op]),
seconds: self.read_part_seconds.with_label_values(&[op]),
ts_rewrite: self.read_ts_rewrite.with_label_values(&[op]),
}
}
fn locks_metrics(&self) -> LocksMetrics {
LocksMetrics {
applier_read_cacheable: self.lock_metrics("applier_read_cacheable"),
applier_read_noncacheable: self.lock_metrics("applier_read_noncacheable"),
applier_write: self.lock_metrics("applier_write"),
watch: self.lock_metrics("watch"),
}
}
fn lock_metrics(&self, op: &str) -> LockMetrics {
LockMetrics {
acquire_count: self.lock_acquire_count.with_label_values(&[op]),
blocking_acquire_count: self.lock_blocking_acquire_count.with_label_values(&[op]),
blocking_seconds: self.lock_blocking_seconds.with_label_values(&[op]),
}
}
}
#[derive(Debug)]
pub struct CmdMetrics {
pub(crate) name: String,
pub(crate) started: IntCounter,
pub(crate) cas_mismatch: IntCounter,
pub(crate) succeeded: IntCounter,
pub(crate) failed: IntCounter,
pub(crate) seconds: Counter,
}
impl CmdMetrics {
pub async fn run_cmd<R, E, F, CmdFn>(
&self,
shard_metrics: &ShardMetrics,
cmd_fn: CmdFn,
) -> Result<R, E>
where
F: std::future::Future<Output = Result<R, E>>,
CmdFn: FnOnce() -> F,
{
self.started.inc();
let start = Instant::now();
let res = cmd_fn().await;
self.seconds.inc_by(start.elapsed().as_secs_f64());
match res.as_ref() {
Ok(_) => {
self.succeeded.inc();
shard_metrics.cmd_succeeded.inc();
}
Err(_) => self.failed.inc(),
};
res
}
}
#[derive(Debug)]
pub struct CmdsMetrics {
pub(crate) init_state: CmdMetrics,
pub(crate) add_rollup: CmdMetrics,
pub(crate) remove_rollups: CmdMetrics,
pub(crate) register: CmdMetrics,
pub(crate) compare_and_append: CmdMetrics,
pub(crate) compare_and_append_noop: IntCounter,
pub(crate) compare_and_downgrade_since: CmdMetrics,
pub(crate) downgrade_since: CmdMetrics,
pub(crate) heartbeat_reader: CmdMetrics,
pub(crate) expire_reader: CmdMetrics,
pub(crate) expire_writer: CmdMetrics,
pub(crate) merge_res: CmdMetrics,
pub(crate) become_tombstone: CmdMetrics,
pub(crate) compare_and_evolve_schema: CmdMetrics,
pub(crate) spine_exert: CmdMetrics,
pub(crate) fetch_upper_count: IntCounter,
}
#[derive(Debug)]
pub struct RetryMetrics {
pub(crate) name: String,
pub(crate) started: IntCounter,
pub(crate) finished: IntCounter,
pub(crate) retries: IntCounter,
pub(crate) sleep_seconds: Counter,
}
impl RetryMetrics {
pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
MetricsRetryStream::new(retry, self)
}
}
#[derive(Debug)]
pub struct RetryDeterminate {
pub(crate) apply_unbatched_cmd_cas: RetryMetrics,
}
#[derive(Debug)]
pub struct RetryExternal {
pub(crate) batch_delete: Arc<RetryMetrics>,
pub(crate) batch_set: RetryMetrics,
pub(crate) blob_open: RetryMetrics,
pub(crate) compaction_noop_delete: Arc<RetryMetrics>,
pub(crate) consensus_open: RetryMetrics,
pub(crate) fetch_batch_get: RetryMetrics,
pub(crate) fetch_state_scan: RetryMetrics,
pub(crate) gc_truncate: RetryMetrics,
pub(crate) maybe_init_cas: RetryMetrics,
pub(crate) rollup_delete: RetryMetrics,
pub(crate) rollup_get: RetryMetrics,
pub(crate) rollup_set: RetryMetrics,
pub(crate) hollow_run_get: RetryMetrics,
pub(crate) hollow_run_set: RetryMetrics,
pub(crate) storage_usage_shard_size: RetryMetrics,
}
#[derive(Debug)]
pub struct RetriesMetrics {
pub(crate) determinate: RetryDeterminate,
pub(crate) external: RetryExternal,
pub(crate) compare_and_append_idempotent: RetryMetrics,
pub(crate) fetch_latest_state: RetryMetrics,
pub(crate) fetch_live_states: RetryMetrics,
pub(crate) idempotent_cmd: RetryMetrics,
pub(crate) next_listen_batch: RetryMetrics,
pub(crate) snapshot: RetryMetrics,
}
#[derive(Debug)]
pub struct BatchPartReadMetrics {
pub(crate) listen: ReadMetrics,
pub(crate) snapshot: ReadMetrics,
pub(crate) batch_fetcher: ReadMetrics,
pub(crate) compaction: ReadMetrics,
pub(crate) unindexed: ReadMetrics,
}
#[derive(Debug, Clone)]
pub struct ReadMetrics {
pub(crate) part_bytes: IntCounter,
pub(crate) part_goodbytes: IntCounter,
pub(crate) part_count: IntCounter,
pub(crate) seconds: Counter,
pub(crate) ts_rewrite: IntCounter,
}
#[derive(Debug, Clone)]
pub struct BatchWriteMetrics {
pub(crate) bytes: IntCounter,
pub(crate) goodbytes: IntCounter,
pub(crate) seconds: Counter,
pub(crate) write_stalls: IntCounter,
pub(crate) key_lower_too_big: IntCounter,
pub(crate) unordered: IntCounter,
pub(crate) codec_order: IntCounter,
pub(crate) structured_order: IntCounter,
_order_counts: IntCounterVec,
pub(crate) step_stats: Counter,
pub(crate) step_part_writing: Counter,
pub(crate) step_inline: Counter,
}
impl BatchWriteMetrics {
fn new(registry: &MetricsRegistry, name: &str) -> Self {
let order_counts: IntCounterVec = registry.register(metric!(
name: format!("mz_persist_{}_write_batch_order", name),
help: "count of batches by the data ordering",
var_labels: ["order"],
));
let unordered = order_counts.with_label_values(&["unordered"]);
let codec_order = order_counts.with_label_values(&["codec"]);
let structured_order = order_counts.with_label_values(&["structured"]);
BatchWriteMetrics {
bytes: registry.register(metric!(
name: format!("mz_persist_{}_bytes", name),
help: format!("total encoded size of {} batches written", name),
)),
goodbytes: registry.register(metric!(
name: format!("mz_persist_{}_goodbytes", name),
help: format!("total logical size of {} batches written", name),
)),
seconds: registry.register(metric!(
name: format!("mz_persist_{}_write_batch_part_seconds", name),
help: format!("time spent writing {} batches", name),
)),
write_stalls: registry.register(metric!(
name: format!("mz_persist_{}_write_stall_count", name),
help: format!(
"count of {} writes stalling to await max outstanding reqs",
name
),
)),
key_lower_too_big: registry.register(metric!(
name: format!("mz_persist_{}_key_lower_too_big", name),
help: format!(
"count of {} writes that were unable to write a key lower, because the size threshold was too low",
name
),
)),
unordered,
codec_order,
structured_order,
_order_counts: order_counts,
step_stats: registry.register(metric!(
name: format!("mz_persist_{}_step_stats", name),
help: format!("time spent computing {} update stats", name),
)),
step_part_writing: registry.register(metric!(
name: format!("mz_persist_{}_step_part_writing", name),
help: format!("blocking time spent writing parts for {} updates", name),
)),
step_inline: registry.register(metric!(
name: format!("mz_persist_{}_step_inline", name),
help: format!("time spent encoding {} inline batches", name)
)),
}
}
}
#[derive(Debug)]
pub struct CompactionMetrics {
pub(crate) requested: IntCounter,
pub(crate) dropped: IntCounter,
pub(crate) skipped: IntCounter,
pub(crate) started: IntCounter,
pub(crate) applied: IntCounter,
pub(crate) timed_out: IntCounter,
pub(crate) failed: IntCounter,
pub(crate) noop: IntCounter,
pub(crate) seconds: Counter,
pub(crate) concurrency_waits: IntCounter,
pub(crate) queued_seconds: Counter,
pub(crate) memory_violations: IntCounter,
pub(crate) runs_compacted: IntCounter,
pub(crate) chunks_compacted: IntCounter,
pub(crate) not_all_prefetched: IntCounter,
pub(crate) parts_prefetched: IntCounter,
pub(crate) parts_waited: IntCounter,
pub(crate) fast_path_eligible: IntCounter,
pub(crate) admin_count: IntCounter,
pub(crate) applied_exact_match: IntCounter,
pub(crate) applied_subset_match: IntCounter,
pub(crate) not_applied_too_many_updates: IntCounter,
pub(crate) batch: BatchWriteMetrics,
pub(crate) steps: CompactionStepTimings,
pub(crate) _steps_vec: CounterVec,
}
impl CompactionMetrics {
fn new(registry: &MetricsRegistry) -> Self {
let step_timings: CounterVec = registry.register(metric!(
name: "mz_persist_compaction_step_seconds",
help: "time spent on individual steps of compaction",
var_labels: ["step"],
));
CompactionMetrics {
requested: registry.register(metric!(
name: "mz_persist_compaction_requested",
help: "count of total compaction requests",
)),
dropped: registry.register(metric!(
name: "mz_persist_compaction_dropped",
help: "count of total compaction requests dropped due to a full queue",
)),
skipped: registry.register(metric!(
name: "mz_persist_compaction_skipped",
help: "count of compactions skipped due to heuristics",
)),
started: registry.register(metric!(
name: "mz_persist_compaction_started",
help: "count of compactions started",
)),
failed: registry.register(metric!(
name: "mz_persist_compaction_failed",
help: "count of compactions failed",
)),
applied: registry.register(metric!(
name: "mz_persist_compaction_applied",
help: "count of compactions applied to state",
)),
timed_out: registry.register(metric!(
name: "mz_persist_compaction_timed_out",
help: "count of compactions that timed out",
)),
noop: registry.register(metric!(
name: "mz_persist_compaction_noop",
help: "count of compactions discarded (obsolete)",
)),
seconds: registry.register(metric!(
name: "mz_persist_compaction_seconds",
help: "time spent in compaction",
)),
concurrency_waits: registry.register(metric!(
name: "mz_persist_compaction_concurrency_waits",
help: "count of compaction requests that ever blocked due to concurrency limit",
)),
queued_seconds: registry.register(metric!(
name: "mz_persist_compaction_queued_seconds",
help: "time that compaction requests spent queued",
)),
memory_violations: registry.register(metric!(
name: "mz_persist_compaction_memory_violations",
help: "count of compaction memory requirement violations",
)),
runs_compacted: registry.register(metric!(
name: "mz_persist_compaction_runs_compacted",
help: "count of runs compacted",
)),
chunks_compacted: registry.register(metric!(
name: "mz_persist_compaction_chunks_compacted",
help: "count of run chunks compacted",
)),
not_all_prefetched: registry.register(metric!(
name: "mz_persist_compaction_not_all_prefetched",
help: "count of compactions where not all inputs were prefetched",
)),
parts_prefetched: registry.register(metric!(
name: "mz_persist_compaction_parts_prefetched",
help: "count of compaction parts completely prefetched by the time they're needed",
)),
parts_waited: registry.register(metric!(
name: "mz_persist_compaction_parts_waited",
help: "count of compaction parts that had to be waited on",
)),
fast_path_eligible: registry.register(metric!(
name: "mz_persist_compaction_fast_path_eligible",
help: "count of compaction requests that could have used the fast-path optimization",
)),
admin_count: registry.register(metric!(
name: "mz_persist_compaction_admin_count",
help: "count of compaction requests that were performed by admin tooling",
)),
applied_exact_match: registry.register(metric!(
name: "mz_persist_compaction_applied_exact_match",
help: "count of merge results that exactly replaced a SpineBatch",
)),
applied_subset_match: registry.register(metric!(
name: "mz_persist_compaction_applied_subset_match",
help: "count of merge results that replaced a subset of a SpineBatch",
)),
not_applied_too_many_updates: registry.register(metric!(
name: "mz_persist_compaction_not_applied_too_many_updates",
help: "count of merge results that did not apply due to too many updates",
)),
batch: BatchWriteMetrics::new(registry, "compaction"),
steps: CompactionStepTimings::new(step_timings.clone()),
_steps_vec: step_timings,
}
}
}
#[derive(Debug)]
pub struct CompactionStepTimings {
pub(crate) part_fetch_seconds: Counter,
pub(crate) heap_population_seconds: Counter,
}
impl CompactionStepTimings {
fn new(step_timings: CounterVec) -> CompactionStepTimings {
CompactionStepTimings {
part_fetch_seconds: step_timings.with_label_values(&["part_fetch"]),
heap_population_seconds: step_timings.with_label_values(&["heap_population"]),
}
}
}
#[derive(Debug)]
pub struct GcMetrics {
pub(crate) noop: IntCounter,
pub(crate) started: IntCounter,
pub(crate) finished: IntCounter,
pub(crate) merged: IntCounter,
pub(crate) seconds: Counter,
pub(crate) steps: GcStepTimings,
}
#[derive(Debug)]
pub struct GcStepTimings {
pub(crate) find_removable_rollups: Counter,
pub(crate) fetch_seconds: Counter,
pub(crate) find_deletable_blobs_seconds: Counter,
pub(crate) delete_rollup_seconds: Counter,
pub(crate) delete_batch_part_seconds: Counter,
pub(crate) truncate_diff_seconds: Counter,
pub(crate) remove_rollups_from_state: Counter,
pub(crate) post_gc_calculations_seconds: Counter,
}
impl GcStepTimings {
fn new(step_timings: CounterVec) -> Self {
Self {
find_removable_rollups: step_timings.with_label_values(&["find_removable_rollups"]),
fetch_seconds: step_timings.with_label_values(&["fetch"]),
find_deletable_blobs_seconds: step_timings.with_label_values(&["find_deletable_blobs"]),
delete_rollup_seconds: step_timings.with_label_values(&["delete_rollup"]),
delete_batch_part_seconds: step_timings.with_label_values(&["delete_batch_part"]),
truncate_diff_seconds: step_timings.with_label_values(&["truncate_diff"]),
remove_rollups_from_state: step_timings
.with_label_values(&["remove_rollups_from_state"]),
post_gc_calculations_seconds: step_timings.with_label_values(&["post_gc_calculations"]),
}
}
}
impl GcMetrics {
fn new(registry: &MetricsRegistry) -> Self {
let step_timings: CounterVec = registry.register(metric!(
name: "mz_persist_gc_step_seconds",
help: "time spent on individual steps of gc",
var_labels: ["step"],
));
GcMetrics {
noop: registry.register(metric!(
name: "mz_persist_gc_noop",
help: "count of garbage collections skipped because they were already done",
)),
started: registry.register(metric!(
name: "mz_persist_gc_started",
help: "count of garbage collections started",
)),
finished: registry.register(metric!(
name: "mz_persist_gc_finished",
help: "count of garbage collections finished",
)),
merged: registry.register(metric!(
name: "mz_persist_gc_merged_reqs",
help: "count of garbage collection requests merged",
)),
seconds: registry.register(metric!(
name: "mz_persist_gc_seconds",
help: "time spent in garbage collections",
)),
steps: GcStepTimings::new(step_timings),
}
}
}
#[derive(Debug)]
pub struct LeaseMetrics {
pub(crate) timeout_read: IntCounter,
pub(crate) dropped_part: IntCounter,
}
impl LeaseMetrics {
fn new(registry: &MetricsRegistry) -> Self {
LeaseMetrics {
timeout_read: registry.register(metric!(
name: "mz_persist_lease_timeout_read",
help: "count of readers whose lease timed out",
)),
dropped_part: registry.register(metric!(
name: "mz_persist_lease_dropped_part",
help: "count of LeasedBatchParts that were dropped without being politely returned",
)),
}
}
}
struct IncOnDrop(IntCounter);
impl Drop for IncOnDrop {
fn drop(&mut self) {
self.0.inc()
}
}
pub struct MetricsRetryStream {
retry: RetryStream,
pub(crate) retries: IntCounter,
sleep_seconds: Counter,
_finished: IncOnDrop,
}
impl MetricsRetryStream {
pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
metrics.started.inc();
MetricsRetryStream {
retry,
retries: metrics.retries.clone(),
sleep_seconds: metrics.sleep_seconds.clone(),
_finished: IncOnDrop(metrics.finished.clone()),
}
}
pub fn attempt(&self) -> usize {
self.retry.attempt()
}
pub fn next_sleep(&self) -> Duration {
self.retry.next_sleep()
}
pub async fn sleep(self) -> Self {
self.retries.inc();
self.sleep_seconds
.inc_by(self.retry.next_sleep().as_secs_f64());
let retry = self.retry.sleep().await;
MetricsRetryStream {
retry,
retries: self.retries,
sleep_seconds: self.sleep_seconds,
_finished: self._finished,
}
}
}
#[derive(Debug)]
pub struct CodecsMetrics {
pub(crate) state: CodecMetrics,
pub(crate) state_diff: CodecMetrics,
pub(crate) batch: CodecMetrics,
pub(crate) key: CodecMetrics,
pub(crate) val: CodecMetrics,
}
#[derive(Debug)]
pub struct CodecMetrics {
pub(crate) encode_count: IntCounter,
pub(crate) encode_seconds: Counter,
pub(crate) decode_count: IntCounter,
pub(crate) decode_seconds: Counter,
}
impl CodecMetrics {
pub(crate) fn encode<R, F: FnOnce() -> R>(&self, f: F) -> R {
let now = Instant::now();
let r = f();
self.encode_count.inc();
self.encode_seconds.inc_by(now.elapsed().as_secs_f64());
r
}
pub(crate) fn decode<R, F: FnOnce() -> R>(&self, f: F) -> R {
let now = Instant::now();
let r = f();
self.decode_count.inc();
self.decode_seconds.inc_by(now.elapsed().as_secs_f64());
r
}
}
#[derive(Debug)]
pub struct StateMetrics {
pub(crate) apply_spine_fast_path: IntCounter,
pub(crate) apply_spine_slow_path: IntCounter,
pub(crate) apply_spine_slow_path_lenient: IntCounter,
pub(crate) apply_spine_slow_path_lenient_adjustment: IntCounter,
pub(crate) apply_spine_slow_path_with_reconstruction: IntCounter,
pub(crate) apply_spine_flattened: IntCounter,
pub(crate) update_state_noop_path: IntCounter,
pub(crate) update_state_empty_path: IntCounter,
pub(crate) update_state_fast_path: IntCounter,
pub(crate) update_state_slow_path: IntCounter,
pub(crate) rollup_at_seqno_migration: IntCounter,
pub(crate) fetch_recent_live_diffs_fast_path: IntCounter,
pub(crate) fetch_recent_live_diffs_slow_path: IntCounter,
pub(crate) writer_added: IntCounter,
pub(crate) writer_removed: IntCounter,
pub(crate) force_apply_hostname: IntCounter,
pub(crate) rollup_write_success: IntCounter,
pub(crate) rollup_write_noop_latest: IntCounter,
pub(crate) rollup_write_noop_truncated: IntCounter,
}
impl StateMetrics {
pub(crate) fn new(registry: &MetricsRegistry) -> Self {
let rollup_write_noop: IntCounterVec = registry.register(metric!(
name: "mz_persist_state_rollup_write_noop",
help: "count of no-op rollup writes",
var_labels: ["reason"],
));
StateMetrics {
apply_spine_fast_path: registry.register(metric!(
name: "mz_persist_state_apply_spine_fast_path",
help: "count of spine diff applications that hit the fast path",
)),
apply_spine_slow_path: registry.register(metric!(
name: "mz_persist_state_apply_spine_slow_path",
help: "count of spine diff applications that hit the slow path",
)),
apply_spine_slow_path_lenient: registry.register(metric!(
name: "mz_persist_state_apply_spine_slow_path_lenient",
help: "count of spine diff applications that hit the lenient compaction apply path",
)),
apply_spine_slow_path_lenient_adjustment: registry.register(metric!(
name: "mz_persist_state_apply_spine_slow_path_lenient_adjustment",
help: "count of adjustments made by the lenient compaction apply path",
)),
apply_spine_slow_path_with_reconstruction: registry.register(metric!(
name: "mz_persist_state_apply_spine_slow_path_with_reconstruction",
help: "count of spine diff applications that hit the slow path with extra spine reconstruction step",
)),
apply_spine_flattened: registry.register(metric!(
name: "mz_persist_state_apply_spine_flattened",
help: "count of spine diff applications that flatten the trace",
)),
update_state_noop_path: registry.register(metric!(
name: "mz_persist_state_update_state_noop_path",
help: "count of state update applications that no-oped due to shared state",
)),
update_state_empty_path: registry.register(metric!(
name: "mz_persist_state_update_state_empty_path",
help: "count of state update applications that found no new updates",
)),
update_state_fast_path: registry.register(metric!(
name: "mz_persist_state_update_state_fast_path",
help: "count of state update applications that hit the fast path",
)),
update_state_slow_path: registry.register(metric!(
name: "mz_persist_state_update_state_slow_path",
help: "count of state update applications that hit the slow path",
)),
rollup_at_seqno_migration: registry.register(metric!(
name: "mz_persist_state_rollup_at_seqno_migration",
help: "count of fetch_rollup_at_seqno calls that only worked because of the migration",
)),
fetch_recent_live_diffs_fast_path: registry.register(metric!(
name: "mz_persist_state_fetch_recent_live_diffs_fast_path",
help: "count of fetch_recent_live_diffs that hit the fast path",
)),
fetch_recent_live_diffs_slow_path: registry.register(metric!(
name: "mz_persist_state_fetch_recent_live_diffs_slow_path",
help: "count of fetch_recent_live_diffs that hit the slow path",
)),
writer_added: registry.register(metric!(
name: "mz_persist_state_writer_added",
help: "count of writers added to the state",
)),
writer_removed: registry.register(metric!(
name: "mz_persist_state_writer_removed",
help: "count of writers removed from the state",
)),
force_apply_hostname: registry.register(metric!(
name: "mz_persist_state_force_applied_hostname",
help: "count of when hostname diffs needed to be force applied",
)),
rollup_write_success: registry.register(metric!(
name: "mz_persist_state_rollup_write_success",
help: "count of rollups written successful (may not be linked in to state)",
)),
rollup_write_noop_latest: rollup_write_noop.with_label_values(&["latest"]),
rollup_write_noop_truncated: rollup_write_noop.with_label_values(&["truncated"]),
}
}
}
#[derive(Debug)]
pub struct ShardsMetrics {
_count: ComputedIntGauge,
since: mz_ore::metrics::IntGaugeVec,
upper: mz_ore::metrics::IntGaugeVec,
encoded_rollup_size: mz_ore::metrics::UIntGaugeVec,
encoded_diff_size: mz_ore::metrics::IntCounterVec,
hollow_batch_count: mz_ore::metrics::UIntGaugeVec,
spine_batch_count: mz_ore::metrics::UIntGaugeVec,
batch_part_count: mz_ore::metrics::UIntGaugeVec,
batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
update_count: mz_ore::metrics::UIntGaugeVec,
rollup_count: mz_ore::metrics::UIntGaugeVec,
largest_batch_size: mz_ore::metrics::UIntGaugeVec,
seqnos_held: mz_ore::metrics::UIntGaugeVec,
seqnos_since_last_rollup: mz_ore::metrics::UIntGaugeVec,
gc_seqno_held_parts: mz_ore::metrics::UIntGaugeVec,
gc_live_diffs: mz_ore::metrics::UIntGaugeVec,
gc_finished: mz_ore::metrics::IntCounterVec,
compaction_applied: mz_ore::metrics::IntCounterVec,
cmd_succeeded: mz_ore::metrics::IntCounterVec,
usage_current_state_batches_bytes: mz_ore::metrics::UIntGaugeVec,
usage_current_state_rollups_bytes: mz_ore::metrics::UIntGaugeVec,
usage_referenced_not_current_state_bytes: mz_ore::metrics::UIntGaugeVec,
usage_not_leaked_not_referenced_bytes: mz_ore::metrics::UIntGaugeVec,
usage_leaked_bytes: mz_ore::metrics::UIntGaugeVec,
pubsub_push_diff_applied: mz_ore::metrics::IntCounterVec,
pubsub_push_diff_not_applied_stale: mz_ore::metrics::IntCounterVec,
pubsub_push_diff_not_applied_out_of_order: mz_ore::metrics::IntCounterVec,
blob_gets: mz_ore::metrics::IntCounterVec,
blob_sets: mz_ore::metrics::IntCounterVec,
live_writers: mz_ore::metrics::UIntGaugeVec,
unconsolidated_snapshot: mz_ore::metrics::IntCounterVec,
backpressure_emitted_bytes: IntCounterVec,
backpressure_last_backpressured_bytes: UIntGaugeVec,
backpressure_retired_bytes: IntCounterVec,
rewrite_part_count: UIntGaugeVec,
inline_part_count: UIntGaugeVec,
inline_part_bytes: UIntGaugeVec,
compact_batches: UIntGaugeVec,
compacting_batches: UIntGaugeVec,
noncompact_batches: UIntGaugeVec,
schema_registry_version_count: UIntGaugeVec,
inline_backpressure_count: IntCounterVec,
shards: Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
}
impl ShardsMetrics {
fn new(registry: &MetricsRegistry) -> Self {
let shards = Arc::new(Mutex::new(BTreeMap::new()));
let shards_count = Arc::clone(&shards);
ShardsMetrics {
_count: registry.register_computed_gauge(
metric!(
name: "mz_persist_shard_count",
help: "count of all active shards on this process",
),
move || {
let mut ret = 0;
Self::compute(&shards_count, |_m| ret += 1);
ret
},
),
since: registry.register(metric!(
name: "mz_persist_shard_since",
help: "since by shard",
var_labels: ["shard", "name"],
)),
upper: registry.register(metric!(
name: "mz_persist_shard_upper",
help: "upper by shard",
var_labels: ["shard", "name"],
)),
encoded_rollup_size: registry.register(metric!(
name: "mz_persist_shard_rollup_size_bytes",
help: "total encoded rollup size by shard",
var_labels: ["shard", "name"],
)),
encoded_diff_size: registry.register(metric!(
name: "mz_persist_shard_diff_size_bytes",
help: "total encoded diff size by shard",
var_labels: ["shard", "name"],
)),
hollow_batch_count: registry.register(metric!(
name: "mz_persist_shard_hollow_batch_count",
help: "count of hollow batches by shard",
var_labels: ["shard", "name"],
)),
spine_batch_count: registry.register(metric!(
name: "mz_persist_shard_spine_batch_count",
help: "count of spine batches by shard",
var_labels: ["shard", "name"],
)),
batch_part_count: registry.register(metric!(
name: "mz_persist_shard_batch_part_count",
help: "count of batch parts by shard",
var_labels: ["shard", "name"],
)),
batch_part_version_count: registry.register(metric!(
name: "mz_persist_shard_batch_part_version_count",
help: "count of batch parts by shard and version",
var_labels: ["shard", "name", "version"],
)),
batch_part_version_bytes: registry.register(metric!(
name: "mz_persist_shard_batch_part_version_bytes",
help: "total bytes in batch parts by shard and version",
var_labels: ["shard", "name", "version"],
)),
update_count: registry.register(metric!(
name: "mz_persist_shard_update_count",
help: "count of updates by shard",
var_labels: ["shard", "name"],
)),
rollup_count: registry.register(metric!(
name: "mz_persist_shard_rollup_count",
help: "count of rollups by shard",
var_labels: ["shard", "name"],
)),
largest_batch_size: registry.register(metric!(
name: "mz_persist_shard_largest_batch_size",
help: "largest encoded batch size by shard",
var_labels: ["shard", "name"],
)),
seqnos_held: registry.register(metric!(
name: "mz_persist_shard_seqnos_held",
help: "maximum count of gc-ineligible states by shard",
var_labels: ["shard", "name"],
)),
seqnos_since_last_rollup: registry.register(metric!(
name: "mz_persist_shard_seqnos_since_last_rollup",
help: "count of seqnos since last rollup",
var_labels: ["shard", "name"],
)),
gc_seqno_held_parts: registry.register(metric!(
name: "mz_persist_shard_gc_seqno_held_parts",
help: "count of parts referenced by some live state but not the current state (ie. parts kept only to satisfy seqno holds) at GC time",
var_labels: ["shard", "name"],
)),
gc_live_diffs: registry.register(metric!(
name: "mz_persist_shard_gc_live_diffs",
help: "the number of diffs (or, alternatively, the number of seqnos) present in consensus state at GC time",
var_labels: ["shard", "name"],
)),
gc_finished: registry.register(metric!(
name: "mz_persist_shard_gc_finished",
help: "count of garbage collections finished by shard",
var_labels: ["shard", "name"],
)),
compaction_applied: registry.register(metric!(
name: "mz_persist_shard_compaction_applied",
help: "count of compactions applied to state by shard",
var_labels: ["shard", "name"],
)),
cmd_succeeded: registry.register(metric!(
name: "mz_persist_shard_cmd_succeeded",
help: "count of commands succeeded by shard",
var_labels: ["shard", "name"],
)),
usage_current_state_batches_bytes: registry.register(metric!(
name: "mz_persist_shard_usage_current_state_batches_bytes",
help: "data in batches/parts referenced by current version of state",
var_labels: ["shard", "name"],
)),
usage_current_state_rollups_bytes: registry.register(metric!(
name: "mz_persist_shard_usage_current_state_rollups_bytes",
help: "data in rollups referenced by current version of state",
var_labels: ["shard", "name"],
)),
usage_referenced_not_current_state_bytes: registry.register(metric!(
name: "mz_persist_shard_usage_referenced_not_current_state_bytes",
help: "data referenced only by a previous version of state",
var_labels: ["shard", "name"],
)),
usage_not_leaked_not_referenced_bytes: registry.register(metric!(
name: "mz_persist_shard_usage_not_leaked_not_referenced_bytes",
help: "data written by an active writer but not referenced by any version of state",
var_labels: ["shard", "name"],
)),
usage_leaked_bytes: registry.register(metric!(
name: "mz_persist_shard_usage_leaked_bytes",
help: "data reclaimable by a leaked blob detector",
var_labels: ["shard", "name"],
)),
pubsub_push_diff_applied: registry.register(metric!(
name: "mz_persist_shard_pubsub_diff_applied",
help: "number of diffs received via pubsub that applied",
var_labels: ["shard", "name"],
)),
pubsub_push_diff_not_applied_stale: registry.register(metric!(
name: "mz_persist_shard_pubsub_diff_not_applied_stale",
help: "number of diffs received via pubsub that did not apply due to staleness",
var_labels: ["shard", "name"],
)),
pubsub_push_diff_not_applied_out_of_order: registry.register(metric!(
name: "mz_persist_shard_pubsub_diff_not_applied_out_of_order",
help: "number of diffs received via pubsub that did not apply due to out-of-order delivery",
var_labels: ["shard", "name"],
)),
blob_gets: registry.register(metric!(
name: "mz_persist_shard_blob_gets",
help: "number of Blob::get calls for this shard",
var_labels: ["shard", "name"],
)),
blob_sets: registry.register(metric!(
name: "mz_persist_shard_blob_sets",
help: "number of Blob::set calls for this shard",
var_labels: ["shard", "name"],
)),
live_writers: registry.register(metric!(
name: "mz_persist_shard_live_writers",
help: "number of writers that have recently appended updates to this shard",
var_labels: ["shard", "name"],
)),
unconsolidated_snapshot: registry.register(metric!(
name: "mz_persist_shard_unconsolidated_snapshot",
help: "in snapshot_and_read, the number of times consolidating the raw data wasn't enough to produce consolidated output",
var_labels: ["shard", "name"],
)),
backpressure_emitted_bytes: registry.register(metric!(
name: "mz_persist_backpressure_emitted_bytes",
help: "A counter with the number of emitted bytes.",
var_labels: ["shard", "name"],
)),
backpressure_last_backpressured_bytes: registry.register(metric!(
name: "mz_persist_backpressure_last_backpressured_bytes",
help: "The last count of bytes we are waiting to be retired in \
the operator. This cannot be directly compared to \
`retired_bytes`, but CAN indicate that backpressure is happening.",
var_labels: ["shard", "name"],
)),
backpressure_retired_bytes: registry.register(metric!(
name: "mz_persist_backpressure_retired_bytes",
help:"A counter with the number of bytes retired by downstream processing.",
var_labels: ["shard", "name"],
)),
rewrite_part_count: registry.register(metric!(
name: "mz_persist_shard_rewrite_part_count",
help: "count of batch parts with rewrites by shard",
var_labels: ["shard", "name"],
)),
inline_part_count: registry.register(metric!(
name: "mz_persist_shard_inline_part_count",
help: "count of parts inline in shard metadata",
var_labels: ["shard", "name"],
)),
inline_part_bytes: registry.register(metric!(
name: "mz_persist_shard_inline_part_bytes",
help: "total size of parts inline in shard metadata",
var_labels: ["shard", "name"],
)),
compact_batches: registry.register(metric!(
name: "mz_persist_shard_compact_batches",
help: "number of fully compact batches in the shard",
var_labels: ["shard", "name"],
)),
compacting_batches: registry.register(metric!(
name: "mz_persist_shard_compacting_batches",
help: "number of batches in the shard with compactions in progress",
var_labels: ["shard", "name"],
)),
noncompact_batches: registry.register(metric!(
name: "mz_persist_shard_noncompact_batches",
help: "number of batches in the shard that aren't compact and have no ongoing compaction",
var_labels: ["shard", "name"],
)),
schema_registry_version_count: registry.register(metric!(
name: "mz_persist_shard_schema_registry_version_count",
help: "count of versions in the schema registry",
var_labels: ["shard", "name"],
)),
inline_backpressure_count: registry.register(metric!(
name: "mz_persist_shard_inline_backpressure_count",
help: "count of CaA attempts retried because of inline backpressure",
var_labels: ["shard", "name"],
)),
shards,
}
}
pub fn shard(&self, shard_id: &ShardId, name: &str) -> Arc<ShardMetrics> {
let mut shards = self.shards.lock().expect("mutex poisoned");
if let Some(shard) = shards.get(shard_id) {
if let Some(shard) = shard.upgrade() {
return Arc::clone(&shard);
} else {
assert!(shards.remove(shard_id).is_some());
}
}
let shard = Arc::new(ShardMetrics::new(shard_id, name, self));
assert!(shards
.insert(shard_id.clone(), Arc::downgrade(&shard))
.is_none());
shard
}
fn compute<F: FnMut(&ShardMetrics)>(
shards: &Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
mut f: F,
) {
let mut shards = shards.lock().expect("mutex poisoned");
let mut deleted_shards = Vec::new();
for (shard_id, metrics) in shards.iter() {
if let Some(metrics) = metrics.upgrade() {
f(&metrics);
} else {
deleted_shards.push(shard_id.clone());
}
}
for deleted_shard_id in deleted_shards {
assert!(shards.remove(&deleted_shard_id).is_some());
}
}
}
#[derive(Debug)]
pub struct ShardMetrics {
pub shard_id: ShardId,
pub name: String,
pub since: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
pub upper: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
pub largest_batch_size: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub latest_rollup_size: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub encoded_diff_size: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub hollow_batch_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub spine_batch_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub batch_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
batch_part_version_map: Mutex<BTreeMap<String, BatchPartVersionMetrics>>,
pub update_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub rollup_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub seqnos_held: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub seqnos_since_last_rollup: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub gc_seqno_held_parts: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub gc_live_diffs: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub usage_current_state_batches_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub usage_current_state_rollups_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub usage_referenced_not_current_state_bytes:
DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub usage_not_leaked_not_referenced_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub usage_leaked_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub gc_finished: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub compaction_applied: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub cmd_succeeded: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub pubsub_push_diff_applied: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub pubsub_push_diff_not_applied_stale: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub pubsub_push_diff_not_applied_out_of_order:
DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub blob_gets: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub blob_sets: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub live_writers: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub unconsolidated_snapshot: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub backpressure_emitted_bytes: Arc<DeleteOnDropCounter<'static, AtomicU64, Vec<String>>>,
pub backpressure_last_backpressured_bytes:
Arc<DeleteOnDropGauge<'static, AtomicU64, Vec<String>>>,
pub backpressure_retired_bytes: Arc<DeleteOnDropCounter<'static, AtomicU64, Vec<String>>>,
pub rewrite_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub inline_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub inline_part_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub compact_batches: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub compacting_batches: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub noncompact_batches: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub schema_registry_version_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub inline_backpressure_count: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
}
impl ShardMetrics {
pub fn new(shard_id: &ShardId, name: &str, shards_metrics: &ShardsMetrics) -> Self {
let shard = shard_id.to_string();
ShardMetrics {
shard_id: *shard_id,
name: name.to_string(),
since: shards_metrics
.since
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
upper: shards_metrics
.upper
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
latest_rollup_size: shards_metrics
.encoded_rollup_size
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
encoded_diff_size: shards_metrics
.encoded_diff_size
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
hollow_batch_count: shards_metrics
.hollow_batch_count
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
spine_batch_count: shards_metrics
.spine_batch_count
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
batch_part_count: shards_metrics
.batch_part_count
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
batch_part_version_count: shards_metrics.batch_part_version_count.clone(),
batch_part_version_bytes: shards_metrics.batch_part_version_bytes.clone(),
batch_part_version_map: Mutex::new(BTreeMap::new()),
update_count: shards_metrics
.update_count
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
rollup_count: shards_metrics
.rollup_count
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
largest_batch_size: shards_metrics
.largest_batch_size
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
seqnos_held: shards_metrics
.seqnos_held
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
seqnos_since_last_rollup: shards_metrics
.seqnos_since_last_rollup
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
gc_seqno_held_parts: shards_metrics
.gc_seqno_held_parts
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
gc_live_diffs: shards_metrics
.gc_live_diffs
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
gc_finished: shards_metrics
.gc_finished
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
compaction_applied: shards_metrics
.compaction_applied
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
cmd_succeeded: shards_metrics
.cmd_succeeded
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
usage_current_state_batches_bytes: shards_metrics
.usage_current_state_batches_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
usage_current_state_rollups_bytes: shards_metrics
.usage_current_state_rollups_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
usage_referenced_not_current_state_bytes: shards_metrics
.usage_referenced_not_current_state_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
usage_not_leaked_not_referenced_bytes: shards_metrics
.usage_not_leaked_not_referenced_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
usage_leaked_bytes: shards_metrics
.usage_leaked_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
pubsub_push_diff_applied: shards_metrics
.pubsub_push_diff_applied
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
pubsub_push_diff_not_applied_stale: shards_metrics
.pubsub_push_diff_not_applied_stale
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
pubsub_push_diff_not_applied_out_of_order: shards_metrics
.pubsub_push_diff_not_applied_out_of_order
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
blob_gets: shards_metrics
.blob_gets
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
blob_sets: shards_metrics
.blob_sets
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
live_writers: shards_metrics
.live_writers
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
unconsolidated_snapshot: shards_metrics
.unconsolidated_snapshot
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
backpressure_emitted_bytes: Arc::new(
shards_metrics
.backpressure_emitted_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
),
backpressure_last_backpressured_bytes: Arc::new(
shards_metrics
.backpressure_last_backpressured_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
),
backpressure_retired_bytes: Arc::new(
shards_metrics
.backpressure_retired_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
),
rewrite_part_count: shards_metrics
.rewrite_part_count
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
inline_part_count: shards_metrics
.inline_part_count
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
inline_part_bytes: shards_metrics
.inline_part_bytes
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
compact_batches: shards_metrics
.compact_batches
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
compacting_batches: shards_metrics
.compacting_batches
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
noncompact_batches: shards_metrics
.noncompact_batches
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
schema_registry_version_count: shards_metrics
.schema_registry_version_count
.get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
inline_backpressure_count: shards_metrics
.inline_backpressure_count
.get_delete_on_drop_metric(vec![shard, name.to_string()]),
}
}
pub fn set_since<T: Codec64>(&self, since: &Antichain<T>) {
self.since.set(encode_ts_metric(since))
}
pub fn set_upper<T: Codec64>(&self, upper: &Antichain<T>) {
self.upper.set(encode_ts_metric(upper))
}
pub(crate) fn set_batch_part_versions<'a>(
&self,
batch_parts_by_version: impl Iterator<Item = (&'a str, usize)>,
) {
let mut map = self
.batch_part_version_map
.lock()
.expect("mutex should not be poisoned");
for x in map.values() {
x.batch_part_version_count.set(0);
x.batch_part_version_bytes.set(0);
}
for (key, bytes) in batch_parts_by_version {
if !map.contains_key(key) {
map.insert(
key.to_owned(),
BatchPartVersionMetrics {
batch_part_version_count: self
.batch_part_version_count
.get_delete_on_drop_metric(vec![
self.shard_id.to_string(),
self.name.clone(),
key.to_owned(),
]),
batch_part_version_bytes: self
.batch_part_version_bytes
.get_delete_on_drop_metric(vec![
self.shard_id.to_string(),
self.name.clone(),
key.to_owned(),
]),
},
);
}
let value = map.get(key).expect("inserted above");
value.batch_part_version_count.inc();
value.batch_part_version_bytes.add(u64::cast_from(bytes));
}
}
}
#[derive(Debug)]
pub struct BatchPartVersionMetrics {
pub batch_part_version_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub batch_part_version_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
}
#[derive(Debug)]
pub struct UsageAuditMetrics {
pub blob_batch_part_bytes: UIntGauge,
pub blob_batch_part_count: UIntGauge,
pub blob_rollup_bytes: UIntGauge,
pub blob_rollup_count: UIntGauge,
pub blob_bytes: UIntGauge,
pub blob_count: UIntGauge,
pub step_blob_metadata: Counter,
pub step_state: Counter,
pub step_math: Counter,
}
impl UsageAuditMetrics {
fn new(registry: &MetricsRegistry) -> Self {
let step_timings: CounterVec = registry.register(metric!(
name: "mz_persist_audit_step_seconds",
help: "time spent on individual steps of audit",
var_labels: ["step"],
));
UsageAuditMetrics {
blob_batch_part_bytes: registry.register(metric!(
name: "mz_persist_audit_blob_batch_part_bytes",
help: "total size of batch parts in blob",
)),
blob_batch_part_count: registry.register(metric!(
name: "mz_persist_audit_blob_batch_part_count",
help: "count of batch parts in blob",
)),
blob_rollup_bytes: registry.register(metric!(
name: "mz_persist_audit_blob_rollup_bytes",
help: "total size of state rollups stored in blob",
)),
blob_rollup_count: registry.register(metric!(
name: "mz_persist_audit_blob_rollup_count",
help: "count of all state rollups in blob",
)),
blob_bytes: registry.register(metric!(
name: "mz_persist_audit_blob_bytes",
help: "total size of blob",
)),
blob_count: registry.register(metric!(
name: "mz_persist_audit_blob_count",
help: "count of all blobs",
)),
step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]),
step_state: step_timings.with_label_values(&["state"]),
step_math: step_timings.with_label_values(&["math"]),
}
}
}
#[derive(Debug)]
pub enum UpdateDelta {
Negative(u64),
NonNegative(u64),
}
impl UpdateDelta {
pub fn new(new: usize, old: usize) -> Self {
if new < old {
UpdateDelta::Negative(CastFrom::cast_from(old - new))
} else {
UpdateDelta::NonNegative(CastFrom::cast_from(new - old))
}
}
}
#[derive(Debug, Clone)]
pub struct SinkMetrics {
correction_insertions_total: IntCounter,
correction_deletions_total: IntCounter,
correction_capacity_increases_total: IntCounter,
correction_capacity_decreases_total: IntCounter,
correction_max_per_sink_worker_len_updates: raw::UIntGaugeVec,
correction_max_per_sink_worker_capacity_updates: raw::UIntGaugeVec,
}
impl SinkMetrics {
fn new(registry: &MetricsRegistry) -> Self {
SinkMetrics {
correction_insertions_total: registry.register(metric!(
name: "mz_persist_sink_correction_insertions_total",
help: "The cumulative insertions observed on the correction buffer across workers and persist sinks.",
)),
correction_deletions_total: registry.register(metric!(
name: "mz_persist_sink_correction_deletions_total",
help: "The cumulative deletions observed on the correction buffer across workers and persist sinks.",
)),
correction_capacity_increases_total: registry.register(metric!(
name: "mz_persist_sink_correction_capacity_increases_total",
help: "The cumulative capacity increases observed on the correction buffer across workers and persist sinks.",
)),
correction_capacity_decreases_total: registry.register(metric!(
name: "mz_persist_sink_correction_capacity_decreases_total",
help: "The cumulative capacity decreases observed on the correction buffer across workers and persist sinks.",
)),
correction_max_per_sink_worker_len_updates: registry.register(metric!(
name: "mz_persist_sink_correction_max_per_sink_worker_len_updates",
help: "The maximum length observed for the correction buffer of any single persist sink per worker.",
var_labels: ["worker_id"],
)),
correction_max_per_sink_worker_capacity_updates: registry.register(metric!(
name: "mz_persist_sink_correction_max_per_sink_worker_capacity_updates",
help: "The maximum capacity observed for the correction buffer of any single persist sink per worker.",
var_labels: ["worker_id"],
)),
}
}
pub fn for_worker(&self, worker_id: usize) -> SinkWorkerMetrics {
let worker = worker_id.to_string();
let correction_max_per_sink_worker_len_updates = self
.correction_max_per_sink_worker_len_updates
.with_label_values(&[&worker]);
let correction_max_per_sink_worker_capacity_updates = self
.correction_max_per_sink_worker_capacity_updates
.with_label_values(&[&worker]);
SinkWorkerMetrics {
correction_max_per_sink_worker_len_updates,
correction_max_per_sink_worker_capacity_updates,
}
}
pub fn report_correction_update_deltas(
&self,
correction_len_delta: UpdateDelta,
correction_cap_delta: UpdateDelta,
) {
match correction_len_delta {
UpdateDelta::NonNegative(delta) => {
if delta > 0 {
self.correction_insertions_total.inc_by(delta)
}
}
UpdateDelta::Negative(delta) => self.correction_deletions_total.inc_by(delta),
}
match correction_cap_delta {
UpdateDelta::NonNegative(delta) => {
if delta > 0 {
self.correction_capacity_increases_total.inc_by(delta)
}
}
UpdateDelta::Negative(delta) => self.correction_capacity_decreases_total.inc_by(delta),
}
}
}
#[derive(Clone, Debug)]
pub struct SinkWorkerMetrics {
correction_max_per_sink_worker_len_updates: UIntGauge,
correction_max_per_sink_worker_capacity_updates: UIntGauge,
}
impl SinkWorkerMetrics {
pub fn report_correction_update_totals(&self, correction_len: usize, correction_cap: usize) {
let correction_len = CastFrom::cast_from(correction_len);
if correction_len > self.correction_max_per_sink_worker_len_updates.get() {
self.correction_max_per_sink_worker_len_updates
.set(correction_len);
}
let correction_cap = CastFrom::cast_from(correction_cap);
if correction_cap > self.correction_max_per_sink_worker_capacity_updates.get() {
self.correction_max_per_sink_worker_capacity_updates
.set(correction_cap);
}
}
}
#[derive(Debug)]
pub struct AlertsMetrics {
pub(crate) blob_failures: IntCounter,
pub(crate) consensus_failures: IntCounter,
}
impl AlertsMetrics {
fn new(registry: &MetricsRegistry) -> Self {
AlertsMetrics {
blob_failures: registry.register(metric!(
name: "mz_persist_blob_failures",
help: "count of all blob operation failures",
const_labels: {"honeycomb" => "import"},
)),
consensus_failures: registry.register(metric!(
name: "mz_persist_consensus_failures",
help: "count of determinate consensus operation failures",
const_labels: {"honeycomb" => "import"},
)),
}
}
}
#[derive(Debug)]
pub struct PubSubServerMetrics {
pub(crate) active_connections: UIntGauge,
pub(crate) broadcasted_diff_count: IntCounter,
pub(crate) broadcasted_diff_bytes: IntCounter,
pub(crate) broadcasted_diff_dropped_channel_full: IntCounter,
pub(crate) push_seconds: Counter,
pub(crate) subscribe_seconds: Counter,
pub(crate) unsubscribe_seconds: Counter,
pub(crate) connection_cleanup_seconds: Counter,
pub(crate) push_call_count: IntCounter,
pub(crate) subscribe_call_count: IntCounter,
pub(crate) unsubscribe_call_count: IntCounter,
}
impl PubSubServerMetrics {
pub(crate) fn new(registry: &MetricsRegistry) -> Self {
let op_timings: CounterVec = registry.register(metric!(
name: "mz_persist_pubsub_server_operation_seconds",
help: "time spent in pubsub server performing each operation",
var_labels: ["op"],
));
let call_count: IntCounterVec = registry.register(metric!(
name: "mz_persist_pubsub_server_call_count",
help: "count of each pubsub server message received",
var_labels: ["call"],
));
Self {
active_connections: registry.register(metric!(
name: "mz_persist_pubsub_server_active_connections",
help: "number of active connections to server",
)),
broadcasted_diff_count: registry.register(metric!(
name: "mz_persist_pubsub_server_broadcasted_diff_count",
help: "count of total broadcast diff messages sent",
)),
broadcasted_diff_bytes: registry.register(metric!(
name: "mz_persist_pubsub_server_broadcasted_diff_bytes",
help: "count of total broadcast diff bytes sent",
)),
broadcasted_diff_dropped_channel_full: registry.register(metric!(
name: "mz_persist_pubsub_server_broadcasted_diff_dropped_channel_full",
help: "count of diffs dropped due to full connection channel",
)),
push_seconds: op_timings.with_label_values(&["push"]),
subscribe_seconds: op_timings.with_label_values(&["subscribe"]),
unsubscribe_seconds: op_timings.with_label_values(&["unsubscribe"]),
connection_cleanup_seconds: op_timings.with_label_values(&["cleanup"]),
push_call_count: call_count.with_label_values(&["push"]),
subscribe_call_count: call_count.with_label_values(&["subscribe"]),
unsubscribe_call_count: call_count.with_label_values(&["unsubscribe"]),
}
}
}
#[derive(Debug)]
pub struct PubSubClientMetrics {
pub sender: PubSubClientSenderMetrics,
pub receiver: PubSubClientReceiverMetrics,
pub grpc_connection: PubSubGrpcClientConnectionMetrics,
}
impl PubSubClientMetrics {
fn new(registry: &MetricsRegistry) -> Self {
PubSubClientMetrics {
sender: PubSubClientSenderMetrics::new(registry),
receiver: PubSubClientReceiverMetrics::new(registry),
grpc_connection: PubSubGrpcClientConnectionMetrics::new(registry),
}
}
}
#[derive(Debug)]
pub struct PubSubGrpcClientConnectionMetrics {
pub(crate) connected: UIntGauge,
pub(crate) connection_established_count: IntCounter,
pub(crate) connect_call_attempt_count: IntCounter,
pub(crate) broadcast_recv_lagged_count: IntCounter,
pub(crate) grpc_error_count: IntCounter,
}
impl PubSubGrpcClientConnectionMetrics {
fn new(registry: &MetricsRegistry) -> Self {
Self {
connected: registry.register(metric!(
name: "mz_persist_pubsub_client_grpc_connected",
help: "whether the grpc client is currently connected",
)),
connection_established_count: registry.register(metric!(
name: "mz_persist_pubsub_client_grpc_connection_established_count",
help: "count of grpc connection establishments to pubsub server",
)),
connect_call_attempt_count: registry.register(metric!(
name: "mz_persist_pubsub_client_grpc_connect_call_attempt_count",
help: "count of connection call attempts (including retries) to pubsub server",
)),
broadcast_recv_lagged_count: registry.register(metric!(
name: "mz_persist_pubsub_client_grpc_broadcast_recv_lagged_count",
help: "times a message was missed by broadcast receiver due to lag",
)),
grpc_error_count: registry.register(metric!(
name: "mz_persist_pubsub_client_grpc_error_count",
help: "count of grpc errors received",
)),
}
}
}
#[derive(Clone, Debug)]
pub struct PubSubClientReceiverMetrics {
pub(crate) push_received: IntCounter,
pub(crate) unknown_message_received: IntCounter,
pub(crate) approx_diff_latency_seconds: Histogram,
pub(crate) state_pushed_diff_fast_path: IntCounter,
pub(crate) state_pushed_diff_slow_path_succeeded: IntCounter,
pub(crate) state_pushed_diff_slow_path_failed: IntCounter,
}
impl PubSubClientReceiverMetrics {
fn new(registry: &MetricsRegistry) -> Self {
let call_received: IntCounterVec = registry.register(metric!(
name: "mz_persist_pubsub_client_call_received",
help: "times a pubsub client call was received",
var_labels: ["call"],
));
Self {
push_received: call_received.with_label_values(&["push"]),
unknown_message_received: call_received.with_label_values(&["unknown"]),
approx_diff_latency_seconds: registry.register(metric!(
name: "mz_persist_pubsub_client_approx_diff_apply_latency_seconds",
help: "histogram of (approximate) latency between sending a diff and applying it",
buckets: prometheus::exponential_buckets(0.001, 2.0, 13).expect("buckets"),
)),
state_pushed_diff_fast_path: registry.register(metric!(
name: "mz_persist_pubsub_client_receiver_state_push_diff_fast_path",
help: "count fast-path state push_diff calls",
)),
state_pushed_diff_slow_path_succeeded: registry.register(metric!(
name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_succeeded",
help: "count of successful slow-path state push_diff calls",
)),
state_pushed_diff_slow_path_failed: registry.register(metric!(
name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_failed",
help: "count of unsuccessful slow-path state push_diff calls",
)),
}
}
}
#[derive(Debug)]
pub struct PubSubClientSenderMetrics {
pub push: PubSubClientCallMetrics,
pub subscribe: PubSubClientCallMetrics,
pub unsubscribe: PubSubClientCallMetrics,
}
#[derive(Debug)]
pub struct PubSubClientCallMetrics {
pub(crate) succeeded: IntCounter,
pub(crate) bytes_sent: IntCounter,
pub(crate) failed: IntCounter,
}
impl PubSubClientSenderMetrics {
fn new(registry: &MetricsRegistry) -> Self {
let call_bytes_sent: IntCounterVec = registry.register(metric!(
name: "mz_persist_pubsub_client_call_bytes_sent",
help: "number of bytes sent for a given pubsub client call",
var_labels: ["call"],
));
let call_succeeded: IntCounterVec = registry.register(metric!(
name: "mz_persist_pubsub_client_call_succeeded",
help: "times a pubsub client call succeeded",
var_labels: ["call"],
));
let call_failed: IntCounterVec = registry.register(metric!(
name: "mz_persist_pubsub_client_call_failed",
help: "times a pubsub client call failed",
var_labels: ["call"],
));
Self {
push: PubSubClientCallMetrics {
succeeded: call_succeeded.with_label_values(&["push"]),
failed: call_failed.with_label_values(&["push"]),
bytes_sent: call_bytes_sent.with_label_values(&["push"]),
},
subscribe: PubSubClientCallMetrics {
succeeded: call_succeeded.with_label_values(&["subscribe"]),
failed: call_failed.with_label_values(&["subscribe"]),
bytes_sent: call_bytes_sent.with_label_values(&["subscribe"]),
},
unsubscribe: PubSubClientCallMetrics {
succeeded: call_succeeded.with_label_values(&["unsubscribe"]),
failed: call_failed.with_label_values(&["unsubscribe"]),
bytes_sent: call_bytes_sent.with_label_values(&["unsubscribe"]),
},
}
}
}
#[derive(Debug)]
pub struct LocksMetrics {
pub(crate) applier_read_cacheable: LockMetrics,
pub(crate) applier_read_noncacheable: LockMetrics,
pub(crate) applier_write: LockMetrics,
pub(crate) watch: LockMetrics,
}
#[derive(Debug, Clone)]
pub struct LockMetrics {
pub(crate) acquire_count: IntCounter,
pub(crate) blocking_acquire_count: IntCounter,
pub(crate) blocking_seconds: Counter,
}
#[derive(Debug)]
pub struct WatchMetrics {
pub(crate) listen_woken_via_watch: IntCounter,
pub(crate) listen_woken_via_sleep: IntCounter,
pub(crate) listen_resolved_via_watch: IntCounter,
pub(crate) listen_resolved_via_sleep: IntCounter,
pub(crate) snapshot_woken_via_watch: IntCounter,
pub(crate) snapshot_woken_via_sleep: IntCounter,
pub(crate) notify_sent: IntCounter,
pub(crate) notify_noop: IntCounter,
pub(crate) notify_recv: IntCounter,
pub(crate) notify_lagged: IntCounter,
pub(crate) notify_wait_started: IntCounter,
pub(crate) notify_wait_finished: IntCounter,
}
impl WatchMetrics {
fn new(registry: &MetricsRegistry) -> Self {
WatchMetrics {
listen_woken_via_watch: registry.register(metric!(
name: "mz_persist_listen_woken_via_watch",
help: "count of listen next batches wakes via watch notify",
)),
listen_woken_via_sleep: registry.register(metric!(
name: "mz_persist_listen_woken_via_sleep",
help: "count of listen next batches wakes via sleep",
)),
listen_resolved_via_watch: registry.register(metric!(
name: "mz_persist_listen_resolved_via_watch",
help: "count of listen next batches resolved via watch notify",
)),
listen_resolved_via_sleep: registry.register(metric!(
name: "mz_persist_listen_resolved_via_sleep",
help: "count of listen next batches resolved via sleep",
)),
snapshot_woken_via_watch: registry.register(metric!(
name: "mz_persist_snapshot_woken_via_watch",
help: "count of snapshot wakes via watch notify",
)),
snapshot_woken_via_sleep: registry.register(metric!(
name: "mz_persist_snapshot_woken_via_sleep",
help: "count of snapshot wakes via sleep",
)),
notify_sent: registry.register(metric!(
name: "mz_persist_watch_notify_sent",
help: "count of watch notifications sent to a non-empty broadcast channel",
)),
notify_noop: registry.register(metric!(
name: "mz_persist_watch_notify_noop",
help: "count of watch notifications sent to an broadcast channel",
)),
notify_recv: registry.register(metric!(
name: "mz_persist_watch_notify_recv",
help: "count of watch notifications received from the broadcast channel",
)),
notify_lagged: registry.register(metric!(
name: "mz_persist_watch_notify_lagged",
help: "count of lagged events in the watch notification broadcast channel",
)),
notify_wait_started: registry.register(metric!(
name: "mz_persist_watch_notify_wait_started",
help: "count of watch wait calls started",
)),
notify_wait_finished: registry.register(metric!(
name: "mz_persist_watch_notify_wait_finished",
help: "count of watch wait calls resolved",
)),
}
}
}
#[derive(Debug)]
pub struct PushdownMetrics {
pub(crate) parts_filtered_count: IntCounter,
pub(crate) parts_filtered_bytes: IntCounter,
pub(crate) parts_fetched_count: IntCounter,
pub(crate) parts_fetched_bytes: IntCounter,
pub(crate) parts_audited_count: IntCounter,
pub(crate) parts_audited_bytes: IntCounter,
pub(crate) parts_inline_count: IntCounter,
pub(crate) parts_inline_bytes: IntCounter,
pub(crate) parts_faked_count: IntCounter,
pub(crate) parts_faked_bytes: IntCounter,
pub(crate) parts_stats_trimmed_count: IntCounter,
pub(crate) parts_stats_trimmed_bytes: IntCounter,
pub part_stats: PartStatsMetrics,
}
impl PushdownMetrics {
fn new(registry: &MetricsRegistry) -> Self {
PushdownMetrics {
parts_filtered_count: registry.register(metric!(
name: "mz_persist_pushdown_parts_filtered_count",
help: "count of parts filtered by pushdown",
)),
parts_filtered_bytes: registry.register(metric!(
name: "mz_persist_pushdown_parts_filtered_bytes",
help: "total size of parts filtered by pushdown in bytes",
)),
parts_fetched_count: registry.register(metric!(
name: "mz_persist_pushdown_parts_fetched_count",
help: "count of parts not filtered by pushdown",
)),
parts_fetched_bytes: registry.register(metric!(
name: "mz_persist_pushdown_parts_fetched_bytes",
help: "total size of parts not filtered by pushdown in bytes",
)),
parts_audited_count: registry.register(metric!(
name: "mz_persist_pushdown_parts_audited_count",
help: "count of parts fetched only for pushdown audit",
)),
parts_audited_bytes: registry.register(metric!(
name: "mz_persist_pushdown_parts_audited_bytes",
help: "total size of parts fetched only for pushdown audit",
)),
parts_inline_count: registry.register(metric!(
name: "mz_persist_pushdown_parts_inline_count",
help: "count of parts not fetched because they were inline",
)),
parts_inline_bytes: registry.register(metric!(
name: "mz_persist_pushdown_parts_inline_bytes",
help: "total size of parts not fetched because they were inline",
)),
parts_faked_count: registry.register(metric!(
name: "mz_persist_pushdown_parts_faked_count",
help: "count of parts faked because of aggressive projection pushdown",
)),
parts_faked_bytes: registry.register(metric!(
name: "mz_persist_pushdown_parts_faked_bytes",
help: "total size of parts replaced with fakes by aggressive projection pushdown",
)),
parts_stats_trimmed_count: registry.register(metric!(
name: "mz_persist_pushdown_parts_stats_trimmed_count",
help: "count of trimmed part stats",
)),
parts_stats_trimmed_bytes: registry.register(metric!(
name: "mz_persist_pushdown_parts_stats_trimmed_bytes",
help: "total bytes trimmed from part stats",
)),
part_stats: PartStatsMetrics::new(registry),
}
}
}
#[derive(Debug)]
pub struct ConsolidationMetrics {
pub(crate) parts_fetched: IntCounter,
pub(crate) parts_skipped: IntCounter,
pub(crate) parts_wasted: IntCounter,
pub(crate) wrong_sort: IntCounter,
}
impl ConsolidationMetrics {
fn new(registry: &MetricsRegistry) -> Self {
ConsolidationMetrics {
parts_fetched: registry.register(metric!(
name: "mz_persist_consolidation_parts_fetched_count",
help: "count of parts that were fetched and used during consolidation",
)),
parts_skipped: registry.register(metric!(
name: "mz_persist_consolidation_parts_skipped_count",
help: "count of parts that were never needed during consolidation",
)),
parts_wasted: registry.register(metric!(
name: "mz_persist_consolidation_parts_wasted_count",
help: "count of parts that were fetched but not needed during consolidation",
)),
wrong_sort: registry.register(metric!(
name: "mz_persist_consolidation_wrong_sort_count",
help: "count of runs that were sorted using the wrong ordering for the current consolidation",
)),
}
}
}
#[derive(Debug)]
pub struct BlobMemCache {
pub(crate) size_blobs: UIntGauge,
pub(crate) size_bytes: UIntGauge,
pub(crate) hits_blobs: IntCounter,
pub(crate) hits_bytes: IntCounter,
pub(crate) evictions: IntCounter,
}
impl BlobMemCache {
fn new(registry: &MetricsRegistry) -> Self {
BlobMemCache {
size_blobs: registry.register(metric!(
name: "mz_persist_blob_cache_size_blobs",
help: "count of blobs in the cache",
const_labels: {"cache" => "mem"},
)),
size_bytes: registry.register(metric!(
name: "mz_persist_blob_cache_size_bytes",
help: "total size of blobs in the cache",
const_labels: {"cache" => "mem"},
)),
hits_blobs: registry.register(metric!(
name: "mz_persist_blob_cache_hits_blobs",
help: "count of blobs served via cache instead of s3",
const_labels: {"cache" => "mem"},
)),
hits_bytes: registry.register(metric!(
name: "mz_persist_blob_cache_hits_bytes",
help: "total size of blobs served via cache instead of s3",
const_labels: {"cache" => "mem"},
)),
evictions: registry.register(metric!(
name: "mz_persist_blob_cache_evictions",
help: "count of capacity-based cache evictions",
const_labels: {"cache" => "mem"},
)),
}
}
}
#[derive(Debug)]
pub struct SemaphoreMetrics {
cfg: PersistConfig,
registry: MetricsRegistry,
fetch: OnceCell<MetricsSemaphore>,
}
impl SemaphoreMetrics {
fn new(cfg: PersistConfig, registry: MetricsRegistry) -> Self {
SemaphoreMetrics {
cfg,
registry,
fetch: OnceCell::new(),
}
}
async fn fetch(&self) -> &MetricsSemaphore {
if let Some(x) = self.fetch.get() {
return x;
}
let cfg = self.cfg.clone();
let registry = self.registry.clone();
let init = async move {
let total_permits = match cfg.announce_memory_limit {
Some(mem) if cfg.is_cc_active => {
info!("fetch semaphore awaiting first dyncfg values");
let () = cfg.configs_synced_once().await;
let total_permits = usize::cast_lossy(
f64::cast_lossy(mem) * FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg),
);
info!("fetch_semaphore got first dyncfg values");
total_permits
}
Some(_) | None => Semaphore::MAX_PERMITS,
};
MetricsSemaphore::new(®istry, "fetch", total_permits)
};
self.fetch.get_or_init(|| init).await
}
pub(crate) async fn acquire_fetch_permits(&self, encoded_size_bytes: usize) -> MetricsPermits {
let requested_permits = f64::cast_lossy(encoded_size_bytes);
let requested_permits = requested_permits * FETCH_SEMAPHORE_COST_ADJUSTMENT.get(&self.cfg);
let requested_permits = usize::cast_lossy(requested_permits);
self.fetch().await.acquire_permits(requested_permits).await
}
}
#[derive(Debug)]
pub struct MetricsSemaphore {
name: &'static str,
semaphore: Arc<Semaphore>,
total_permits: usize,
acquire_count: IntCounter,
blocking_count: IntCounter,
blocking_seconds: Counter,
acquired_permits: IntCounter,
released_permits: IntCounter,
_available_permits: ComputedUIntGauge,
}
impl MetricsSemaphore {
pub fn new(registry: &MetricsRegistry, name: &'static str, total_permits: usize) -> Self {
let total_permits = std::cmp::min(total_permits, Semaphore::MAX_PERMITS);
let semaphore = Arc::new(Semaphore::new(total_permits));
MetricsSemaphore {
name,
total_permits,
acquire_count: registry.register(metric!(
name: "mz_persist_semaphore_acquire_count",
help: "count of acquire calls (not acquired permits count)",
const_labels: {"name" => name},
)),
blocking_count: registry.register(metric!(
name: "mz_persist_semaphore_blocking_count",
help: "count of acquire calls that had to block",
const_labels: {"name" => name},
)),
blocking_seconds: registry.register(metric!(
name: "mz_persist_semaphore_blocking_seconds",
help: "total time spent blocking on permit acquisition",
const_labels: {"name" => name},
)),
acquired_permits: registry.register(metric!(
name: "mz_persist_semaphore_acquired_permits",
help: "total sum of acquired permits",
const_labels: {"name" => name},
)),
released_permits: registry.register(metric!(
name: "mz_persist_semaphore_released_permits",
help: "total sum of released permits",
const_labels: {"name" => name},
)),
_available_permits: registry.register_computed_gauge(
metric!(
name: "mz_persist_semaphore_available_permits",
help: "currently available permits according to the semaphore",
),
{
let semaphore = Arc::clone(&semaphore);
move || u64::cast_from(semaphore.available_permits())
},
),
semaphore,
}
}
pub async fn acquire_permits(&self, requested_permits: usize) -> MetricsPermits {
let total_permits = u32::try_from(self.total_permits).unwrap_or(u32::MAX);
let requested_permits = u32::try_from(requested_permits).unwrap_or(u32::MAX);
let requested_permits = std::cmp::min(requested_permits, total_permits);
let wrap = |_permit| {
self.acquired_permits.inc_by(u64::from(requested_permits));
MetricsPermits {
_permit,
released_metric: self.released_permits.clone(),
count: requested_permits,
}
};
self.acquire_count.inc();
match Arc::clone(&self.semaphore).try_acquire_many_owned(requested_permits) {
Ok(x) => return wrap(x),
Err(_) => {}
};
self.blocking_count.inc();
let start = Instant::now();
let ret = Arc::clone(&self.semaphore)
.acquire_many_owned(requested_permits)
.instrument(info_span!("acquire_permits"))
.await;
let elapsed = start.elapsed();
self.blocking_seconds.inc_by(elapsed.as_secs_f64());
debug!(
"acquisition of {} {} permits blocked for {:?}",
self.name, requested_permits, elapsed
);
wrap(ret.expect("semaphore is never closed"))
}
}
#[derive(Debug)]
pub struct MetricsPermits {
_permit: OwnedSemaphorePermit,
released_metric: IntCounter,
count: u32,
}
impl Drop for MetricsPermits {
fn drop(&mut self) {
self.released_metric.inc_by(u64::from(self.count))
}
}
#[derive(Debug)]
pub struct ExternalOpMetrics {
started: IntCounter,
succeeded: IntCounter,
failed: IntCounter,
bytes: IntCounter,
seconds: Counter,
seconds_histogram: Option<Histogram>,
alerts_metrics: Arc<AlertsMetrics>,
}
impl ExternalOpMetrics {
async fn run_op<R, F, OpFn, ErrFn>(
&self,
op_fn: OpFn,
on_err_fn: ErrFn,
) -> Result<R, ExternalError>
where
F: std::future::Future<Output = Result<R, ExternalError>>,
OpFn: FnOnce() -> F,
ErrFn: FnOnce(&AlertsMetrics, &ExternalError),
{
self.started.inc();
let start = Instant::now();
let res = op_fn().await;
let elapsed_seconds = start.elapsed().as_secs_f64();
self.seconds.inc_by(elapsed_seconds);
if let Some(h) = &self.seconds_histogram {
h.observe(elapsed_seconds);
}
match res.as_ref() {
Ok(_) => self.succeeded.inc(),
Err(err) => {
self.failed.inc();
on_err_fn(&self.alerts_metrics, err);
}
};
res
}
fn run_stream<'a, R: 'a, S, OpFn, ErrFn>(
&'a self,
op_fn: OpFn,
mut on_err_fn: ErrFn,
) -> impl futures::Stream<Item = Result<R, ExternalError>> + 'a
where
S: futures::Stream<Item = Result<R, ExternalError>> + Unpin + 'a,
OpFn: FnOnce() -> S,
ErrFn: FnMut(&AlertsMetrics, &ExternalError) + 'a,
{
self.started.inc();
let start = Instant::now();
let mut stream = op_fn();
stream! {
let mut succeeded = true;
while let Some(res) = stream.next().await {
if let Err(err) = res.as_ref() {
on_err_fn(&self.alerts_metrics, err);
succeeded = false;
}
yield res;
}
if succeeded {
self.succeeded.inc()
} else {
self.failed.inc()
}
let elapsed_seconds = start.elapsed().as_secs_f64();
self.seconds.inc_by(elapsed_seconds);
if let Some(h) = &self.seconds_histogram {
h.observe(elapsed_seconds);
}
}
}
}
#[derive(Debug)]
pub struct BlobMetrics {
set: ExternalOpMetrics,
get: ExternalOpMetrics,
list_keys: ExternalOpMetrics,
delete: ExternalOpMetrics,
restore: ExternalOpMetrics,
delete_noop: IntCounter,
blob_sizes: Histogram,
pub rtt_latency: Gauge,
}
#[derive(Debug)]
pub struct MetricsBlob {
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
}
impl MetricsBlob {
pub fn new(blob: Arc<dyn Blob>, metrics: Arc<Metrics>) -> Self {
MetricsBlob { blob, metrics }
}
fn on_err(alerts_metrics: &AlertsMetrics, _err: &ExternalError) {
alerts_metrics.blob_failures.inc()
}
}
#[async_trait]
impl Blob for MetricsBlob {
#[instrument(name = "blob::get", fields(shard=blob_key_shard_id(key)))]
async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
let res = self
.metrics
.blob
.get
.run_op(|| self.blob.get(key), Self::on_err)
.await;
if let Ok(Some(value)) = res.as_ref() {
self.metrics
.blob
.get
.bytes
.inc_by(u64::cast_from(value.len()));
}
res
}
#[instrument(name = "blob::list_keys_and_metadata", fields(shard=blob_key_shard_id(key_prefix)))]
async fn list_keys_and_metadata(
&self,
key_prefix: &str,
f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
) -> Result<(), ExternalError> {
let mut byte_total = 0;
let mut instrumented = |blob_metadata: BlobMetadata| {
byte_total += blob_metadata.key.len();
f(blob_metadata)
};
let res = self
.metrics
.blob
.list_keys
.run_op(
|| {
self.blob
.list_keys_and_metadata(key_prefix, &mut instrumented)
},
Self::on_err,
)
.await;
self.metrics
.blob
.list_keys
.bytes
.inc_by(u64::cast_from(byte_total));
res
}
#[instrument(name = "blob::set", fields(shard=blob_key_shard_id(key),size_bytes=value.len()))]
async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
let bytes = value.len();
let res = self
.metrics
.blob
.set
.run_op(|| self.blob.set(key, value), Self::on_err)
.await;
if res.is_ok() {
self.metrics.blob.set.bytes.inc_by(u64::cast_from(bytes));
self.metrics.blob.blob_sizes.observe(f64::cast_lossy(bytes));
}
res
}
#[instrument(name = "blob::delete", fields(shard=blob_key_shard_id(key)))]
async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
let bytes = self
.metrics
.blob
.delete
.run_op(|| self.blob.delete(key), Self::on_err)
.await?;
if let Some(bytes) = bytes {
self.metrics.blob.delete.bytes.inc_by(u64::cast_from(bytes));
} else {
self.metrics.blob.delete_noop.inc();
}
Ok(bytes)
}
async fn restore(&self, key: &str) -> Result<(), ExternalError> {
self.metrics
.blob
.restore
.run_op(|| self.blob.restore(key), Self::on_err)
.await
}
}
#[derive(Debug)]
pub struct ConsensusMetrics {
list_keys: ExternalOpMetrics,
head: ExternalOpMetrics,
compare_and_set: ExternalOpMetrics,
scan: ExternalOpMetrics,
truncate: ExternalOpMetrics,
truncated_count: IntCounter,
pub rtt_latency: Gauge,
}
#[derive(Debug)]
pub struct MetricsConsensus {
consensus: Arc<dyn Consensus>,
metrics: Arc<Metrics>,
}
impl MetricsConsensus {
pub fn new(consensus: Arc<dyn Consensus>, metrics: Arc<Metrics>) -> Self {
MetricsConsensus { consensus, metrics }
}
fn on_err(alerts_metrics: &AlertsMetrics, err: &ExternalError) {
if let ExternalError::Indeterminate(_) = err {
alerts_metrics.consensus_failures.inc()
}
}
}
#[async_trait]
impl Consensus for MetricsConsensus {
fn list_keys(&self) -> ResultStream<String> {
Box::pin(
self.metrics
.consensus
.list_keys
.run_stream(|| self.consensus.list_keys(), Self::on_err),
)
}
#[instrument(name = "consensus::head", fields(shard=key))]
async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
let res = self
.metrics
.consensus
.head
.run_op(|| self.consensus.head(key), Self::on_err)
.await;
if let Ok(Some(data)) = res.as_ref() {
self.metrics
.consensus
.head
.bytes
.inc_by(u64::cast_from(data.data.len()));
}
res
}
#[instrument(name = "consensus::compare_and_set", fields(shard=key,size_bytes=new.data.len()))]
async fn compare_and_set(
&self,
key: &str,
expected: Option<SeqNo>,
new: VersionedData,
) -> Result<CaSResult, ExternalError> {
let bytes = new.data.len();
let res = self
.metrics
.consensus
.compare_and_set
.run_op(
|| self.consensus.compare_and_set(key, expected, new),
Self::on_err,
)
.await;
match res.as_ref() {
Ok(CaSResult::Committed) => self
.metrics
.consensus
.compare_and_set
.bytes
.inc_by(u64::cast_from(bytes)),
Ok(CaSResult::ExpectationMismatch) | Err(_) => {}
}
res
}
#[instrument(name = "consensus::scan", fields(shard=key))]
async fn scan(
&self,
key: &str,
from: SeqNo,
limit: usize,
) -> Result<Vec<VersionedData>, ExternalError> {
let res = self
.metrics
.consensus
.scan
.run_op(|| self.consensus.scan(key, from, limit), Self::on_err)
.await;
if let Ok(dataz) = res.as_ref() {
let bytes: usize = dataz.iter().map(|x| x.data.len()).sum();
self.metrics
.consensus
.scan
.bytes
.inc_by(u64::cast_from(bytes));
}
res
}
#[instrument(name = "consensus::truncate", fields(shard=key))]
async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
let deleted = self
.metrics
.consensus
.truncate
.run_op(|| self.consensus.truncate(key, seqno), Self::on_err)
.await?;
self.metrics
.consensus
.truncated_count
.inc_by(u64::cast_from(deleted));
Ok(deleted)
}
}
#[derive(Debug, Clone)]
pub struct TaskMetrics {
f64_gauges: Vec<(Gauge, fn(&tokio_metrics::TaskMetrics) -> f64)>,
u64_gauges: Vec<(
GenericGauge<AtomicU64>,
fn(&tokio_metrics::TaskMetrics) -> u64,
)>,
monitor: TaskMonitor,
}
impl TaskMetrics {
pub fn new(name: &str) -> Self {
let monitor = TaskMonitor::new();
Self {
f64_gauges: vec![
(
Gauge::make_collector(metric!(
name: "mz_persist_task_total_idle_duration",
help: "Seconds of time spent idling, ie. waiting for a task to be woken up.",
const_labels: {"name" => name}
)),
|m| m.total_idle_duration.as_secs_f64(),
),
(
Gauge::make_collector(metric!(
name: "mz_persist_task_total_scheduled_duration",
help: "Seconds of time spent scheduled, ie. ready to poll but not yet polled.",
const_labels: {"name" => name}
)),
|m| m.total_scheduled_duration.as_secs_f64(),
),
],
u64_gauges: vec![
(
MakeCollector::make_collector(metric!(
name: "mz_persist_task_total_scheduled_count",
help: "The total number of task schedules. Useful for computing the average scheduled time.",
const_labels: {"name" => name}
)),
|m| m.total_scheduled_count,
),
(
MakeCollector::make_collector(metric!(
name: "mz_persist_task_total_idled_count",
help: "The total number of task idles. Useful for computing the average idle time.",
const_labels: {"name" => name}
,
)),
|m| m.total_idled_count,
),
],
monitor,
}
}
pub fn instrument_task<F>(&self, task: F) -> tokio_metrics::Instrumented<F> {
TaskMonitor::instrument(&self.monitor, task)
}
}
impl Collector for TaskMetrics {
fn desc(&self) -> Vec<&Desc> {
let mut descs = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
for (g, _) in &self.f64_gauges {
descs.extend(g.desc());
}
for (g, _) in &self.u64_gauges {
descs.extend(g.desc());
}
descs
}
fn collect(&self) -> Vec<MetricFamily> {
let mut families = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
let metrics = self.monitor.cumulative();
for (g, metrics_fn) in &self.f64_gauges {
g.set(metrics_fn(&metrics));
families.extend(g.collect());
}
for (g, metrics_fn) in &self.u64_gauges {
g.set(metrics_fn(&metrics));
families.extend(g.collect());
}
families
}
}
#[derive(Debug)]
pub struct TasksMetrics {
pub heartbeat_read: TaskMetrics,
}
impl TasksMetrics {
fn new(registry: &MetricsRegistry) -> Self {
let heartbeat_read = TaskMetrics::new("heartbeat_read");
registry.register_collector(heartbeat_read.clone());
TasksMetrics { heartbeat_read }
}
}
#[derive(Debug)]
pub struct SchemaMetrics {
pub(crate) cache_fetch_state_count: IntCounter,
pub(crate) cache_schema: SchemaCacheMetrics,
pub(crate) cache_migration: SchemaCacheMetrics,
pub(crate) migration_count_same: IntCounter,
pub(crate) migration_count_codec: IntCounter,
pub(crate) migration_count_either: IntCounter,
pub(crate) migration_len_legacy_codec: IntCounter,
pub(crate) migration_len_either_codec: IntCounter,
pub(crate) migration_len_either_arrow: IntCounter,
pub(crate) migration_new_count: IntCounter,
pub(crate) migration_new_seconds: Counter,
pub(crate) migration_migrate_seconds: Counter,
}
impl SchemaMetrics {
fn new(registry: &MetricsRegistry) -> Self {
let cached: IntCounterVec = registry.register(metric!(
name: "mz_persist_schema_cache_cached_count",
help: "count of schema cache entries served from cache",
var_labels: ["op"],
));
let computed: IntCounterVec = registry.register(metric!(
name: "mz_persist_schema_cache_computed_count",
help: "count of schema cache entries computed",
var_labels: ["op"],
));
let unavailable: IntCounterVec = registry.register(metric!(
name: "mz_persist_schema_cache_unavailable_count",
help: "count of schema cache entries unavailable at current state",
var_labels: ["op"],
));
let added: IntCounterVec = registry.register(metric!(
name: "mz_persist_schema_cache_added_count",
help: "count of schema cache entries added",
var_labels: ["op"],
));
let dropped: IntCounterVec = registry.register(metric!(
name: "mz_persist_schema_cache_dropped_count",
help: "count of schema cache entries dropped",
var_labels: ["op"],
));
let cache = |name| SchemaCacheMetrics {
cached_count: cached.with_label_values(&[name]),
computed_count: computed.with_label_values(&[name]),
unavailable_count: unavailable.with_label_values(&[name]),
added_count: added.with_label_values(&[name]),
dropped_count: dropped.with_label_values(&[name]),
};
let migration_count: IntCounterVec = registry.register(metric!(
name: "mz_persist_schema_migration_count",
help: "count of fetch part migrations",
var_labels: ["op"],
));
let migration_len: IntCounterVec = registry.register(metric!(
name: "mz_persist_schema_migration_len",
help: "count of migrated update records",
var_labels: ["op"],
));
SchemaMetrics {
cache_fetch_state_count: registry.register(metric!(
name: "mz_persist_schema_cache_fetch_state_count",
help: "count of state fetches by the schema cache",
)),
cache_schema: cache("schema"),
cache_migration: cache("migration"),
migration_count_same: migration_count.with_label_values(&["same"]),
migration_count_codec: migration_count.with_label_values(&["codec"]),
migration_count_either: migration_count.with_label_values(&["either"]),
migration_len_legacy_codec: migration_len.with_label_values(&["legacy_codec"]),
migration_len_either_codec: migration_len.with_label_values(&["either_codec"]),
migration_len_either_arrow: migration_len.with_label_values(&["either_arrow"]),
migration_new_count: registry.register(metric!(
name: "mz_persist_schema_migration_new_count",
help: "count of migrations constructed",
)),
migration_new_seconds: registry.register(metric!(
name: "mz_persist_schema_migration_new_seconds",
help: "seconds spent constructing migration logic",
)),
migration_migrate_seconds: registry.register(metric!(
name: "mz_persist_schema_migration_migrate_seconds",
help: "seconds spent applying migration logic",
)),
}
}
}
#[derive(Debug, Clone)]
pub struct SchemaCacheMetrics {
pub(crate) cached_count: IntCounter,
pub(crate) computed_count: IntCounter,
pub(crate) unavailable_count: IntCounter,
pub(crate) added_count: IntCounter,
pub(crate) dropped_count: IntCounter,
}
#[derive(Debug)]
pub struct InlineMetrics {
pub(crate) part_commit_count: IntCounter,
pub(crate) part_commit_bytes: IntCounter,
pub(crate) backpressure: BatchWriteMetrics,
}
impl InlineMetrics {
fn new(registry: &MetricsRegistry) -> Self {
InlineMetrics {
part_commit_count: registry.register(metric!(
name: "mz_persist_inline_part_commit_count",
help: "count of inline parts committed to state",
)),
part_commit_bytes: registry.register(metric!(
name: "mz_persist_inline_part_commit_bytes",
help: "total size of of inline parts committed to state",
)),
backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"),
}
}
}
fn blob_key_shard_id(key: &str) -> Option<String> {
let (shard_id, _) = BlobKey::parse_ids(key).ok()?;
Some(shard_id.to_string())
}
pub fn encode_ts_metric<T: Codec64>(ts: &Antichain<T>) -> i64 {
match ts.elements().first() {
Some(ts) => i64::from_le_bytes(Codec64::encode(ts)),
None => i64::MAX,
}
}