use std::sync::Arc;
use mz_ore::cast::{CastFrom, TryCastFrom};
use mz_ore::metric;
use mz_ore::metrics::{
CounterVecExt, DeleteOnDropCounter, DeleteOnDropHistogram, HistogramVecExt, IntCounterVec,
MetricsRegistry,
};
use mz_ore::stats::HISTOGRAM_BYTE_BUCKETS;
use mz_service::codec::StatsCollector;
use mz_storage_types::instances::StorageInstanceId;
use crate::client::{ProtoStorageCommand, ProtoStorageResponse};
#[derive(Debug, Clone)]
pub struct StorageControllerMetrics {
messages_sent_bytes: prometheus::HistogramVec,
messages_received_bytes: prometheus::HistogramVec,
startup_prepared_statements_kept: prometheus::IntGauge,
regressed_offset_known: IntCounterVec,
}
impl StorageControllerMetrics {
pub fn new(metrics_registry: MetricsRegistry) -> Self {
Self {
messages_sent_bytes: metrics_registry.register(metric!(
name: "mz_storage_messages_sent_bytes",
help: "size of storage messages sent",
var_labels: ["instance"],
buckets: HISTOGRAM_BYTE_BUCKETS.to_vec()
)),
messages_received_bytes: metrics_registry.register(metric!(
name: "mz_storage_messages_received_bytes",
help: "size of storage messages received",
var_labels: ["instance"],
buckets: HISTOGRAM_BYTE_BUCKETS.to_vec()
)),
startup_prepared_statements_kept: metrics_registry.register(metric!(
name: "mz_storage_startup_prepared_statements_kept",
help: "number of prepared statements kept on startup",
)),
regressed_offset_known: metrics_registry.register(metric!(
name: "mz_storage_regressed_offset_known",
help: "number of regressed offset_known stats for this id",
var_labels: ["id"],
)),
}
}
pub fn regressed_offset_known(
&self,
id: mz_repr::GlobalId,
) -> DeleteOnDropCounter<'static, prometheus::core::AtomicU64, Vec<String>> {
self.regressed_offset_known
.get_delete_on_drop_counter(vec![id.to_string()])
}
pub fn for_instance(&self, id: StorageInstanceId) -> RehydratingStorageClientMetrics {
let labels = vec![id.to_string()];
RehydratingStorageClientMetrics {
inner: Arc::new(RehydratingStorageClientMetricsInner {
messages_sent_bytes: self
.messages_sent_bytes
.get_delete_on_drop_histogram(labels.clone()),
messages_received_bytes: self
.messages_received_bytes
.get_delete_on_drop_histogram(labels),
}),
}
}
pub fn set_startup_prepared_statements_kept(&self, n: u64) {
let n: i64 = n.try_into().expect("realistic number");
self.startup_prepared_statements_kept.set(n);
}
}
#[derive(Debug)]
struct RehydratingStorageClientMetricsInner {
messages_sent_bytes: DeleteOnDropHistogram<'static, Vec<String>>,
messages_received_bytes: DeleteOnDropHistogram<'static, Vec<String>>,
}
#[derive(Debug, Clone)]
pub struct RehydratingStorageClientMetrics {
inner: Arc<RehydratingStorageClientMetricsInner>,
}
impl StatsCollector<ProtoStorageCommand, ProtoStorageResponse> for RehydratingStorageClientMetrics {
fn send_event(&self, _item: &ProtoStorageCommand, size: usize) {
match f64::try_cast_from(u64::cast_from(size)) {
Some(x) => self.inner.messages_sent_bytes.observe(x),
None => tracing::warn!(
"{} has no precise representation as f64, ignoring message",
size
),
}
}
fn receive_event(&self, _item: &ProtoStorageResponse, size: usize) {
match f64::try_cast_from(u64::cast_from(size)) {
Some(x) => self.inner.messages_received_bytes.observe(x),
None => tracing::warn!(
"{} has no precise representation as f64, ignoring message",
size
),
}
}
}