use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Instant;
use mz_ore::metric;
use mz_ore::metrics::{
DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricsRegistry,
UIntGaugeVec,
};
use mz_repr::{GlobalId, Timestamp};
use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
use mz_storage_types::sources::SourceEnvelope;
use prometheus::core::{AtomicI64, AtomicU64};
use serde::{Deserialize, Serialize};
use timely::progress::frontier::Antichain;
use timely::PartialOrder;
#[derive(Clone, Debug)]
pub(crate) struct SourceStatisticsMetricDefs {
pub(crate) messages_received: IntCounterVec,
pub(crate) updates_staged: IntCounterVec,
pub(crate) updates_committed: IntCounterVec,
pub(crate) bytes_received: IntCounterVec,
pub(crate) snapshot_committed: UIntGaugeVec,
pub(crate) bytes_indexed: UIntGaugeVec,
pub(crate) records_indexed: UIntGaugeVec,
pub(crate) rehydration_latency_ms: IntGaugeVec,
pub(crate) offset_known: UIntGaugeVec,
pub(crate) offset_committed: UIntGaugeVec,
pub(crate) snapshot_records_known: UIntGaugeVec,
pub(crate) snapshot_records_staged: UIntGaugeVec,
pub(crate) envelope_state_tombstones: UIntGaugeVec,
}
impl SourceStatisticsMetricDefs {
pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
Self {
snapshot_committed: registry.register(metric!(
name: "mz_source_snapshot_committed",
help: "Whether or not the worker has committed the initial snapshot for a source.",
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
)),
messages_received: registry.register(metric!(
name: "mz_source_messages_received",
help: "The number of raw messages the worker has received from upstream.",
var_labels: ["source_id", "worker_id", "parent_source_id"],
)),
updates_staged: registry.register(metric!(
name: "mz_source_updates_staged",
help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
)),
updates_committed: registry.register(metric!(
name: "mz_source_updates_committed",
help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
)),
bytes_received: registry.register(metric!(
name: "mz_source_bytes_received",
help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
var_labels: ["source_id", "worker_id", "parent_source_id"],
)),
bytes_indexed: registry.register(metric!(
name: "mz_source_bytes_indexed",
help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
)),
records_indexed: registry.register(metric!(
name: "mz_source_records_indexed",
help: "The number of records in the source envelope state. This will be specific to the envelope in use",
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
)),
envelope_state_tombstones: registry.register(metric!(
name: "mz_source_envelope_state_tombstones",
help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
)),
rehydration_latency_ms: registry.register(metric!(
name: "mz_source_rehydration_latency_ms",
help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
)),
offset_known: registry.register(metric!(
name: "mz_source_offset_known",
help: "The total number of _values_ (source-defined unit) present in upstream.",
var_labels: ["source_id", "worker_id", "shard_id"],
)),
offset_committed: registry.register(metric!(
name: "mz_source_offset_committed",
help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
var_labels: ["source_id", "worker_id", "shard_id"],
)),
snapshot_records_known: registry.register(metric!(
name: "mz_source_snapshot_records_known",
help: "The total number of records in the source's snapshot",
var_labels: ["source_id", "worker_id", "shard_id"],
)),
snapshot_records_staged: registry.register(metric!(
name: "mz_source_snapshot_records_staged",
help: "The total number of records read from the source's snapshot",
var_labels: ["source_id", "worker_id", "shard_id"],
)),
}
}
}
#[derive(Debug)]
pub struct SourceStatisticsMetrics {
pub(crate) messages_received: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) updates_staged: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) updates_committed: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) bytes_received: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) snapshot_committed: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) bytes_indexed: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) records_indexed: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) rehydration_latency_ms: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
pub(crate) offset_known: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) offset_committed: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) snapshot_records_known: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) snapshot_records_staged: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
pub(crate) envelope_state_tombstones: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
}
impl SourceStatisticsMetrics {
pub(crate) fn new(
defs: &SourceStatisticsMetricDefs,
id: GlobalId,
worker_id: usize,
parent_source_id: GlobalId,
shard_id: &mz_persist_client::ShardId,
envelope: SourceEnvelope,
) -> SourceStatisticsMetrics {
let shard = shard_id.to_string();
let envelope = match envelope {
SourceEnvelope::None(_) => "none",
SourceEnvelope::Upsert(_) => "upsert",
SourceEnvelope::CdcV2 => "cdcv2",
};
SourceStatisticsMetrics {
snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
shard.clone(),
]),
messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
]),
updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
shard.clone(),
]),
updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
shard.clone(),
]),
bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
]),
bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
shard.clone(),
]),
records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
shard.clone(),
]),
envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
shard.clone(),
],
),
rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
shard.clone(),
envelope.to_string(),
]),
offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
]),
offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
]),
snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
]),
snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
id.to_string(),
worker_id.to_string(),
parent_source_id.to_string(),
]),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct SinkStatisticsMetricDefs {
pub(crate) messages_staged: IntCounterVec,
pub(crate) messages_committed: IntCounterVec,
pub(crate) bytes_staged: IntCounterVec,
pub(crate) bytes_committed: IntCounterVec,
}
impl SinkStatisticsMetricDefs {
pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
Self {
messages_staged: registry.register(metric!(
name: "mz_sink_messages_staged",
help: "The number of messages staged but possibly not committed to the sink.",
var_labels: ["sink_id", "worker_id"],
)),
messages_committed: registry.register(metric!(
name: "mz_sink_messages_committed",
help: "The number of messages committed to the sink.",
var_labels: ["sink_id", "worker_id"],
)),
bytes_staged: registry.register(metric!(
name: "mz_sink_bytes_staged",
help: "The number of bytes staged but possibly not committed to the sink.",
var_labels: ["sink_id", "worker_id"],
)),
bytes_committed: registry.register(metric!(
name: "mz_sink_bytes_committed",
help: "The number of bytes committed to the sink.",
var_labels: ["sink_id", "worker_id"],
)),
}
}
}
#[derive(Debug)]
pub struct SinkStatisticsMetrics {
pub(crate) messages_staged: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) messages_committed: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) bytes_staged: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
pub(crate) bytes_committed: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
}
impl SinkStatisticsMetrics {
pub(crate) fn new(
defs: &SinkStatisticsMetricDefs,
id: GlobalId,
worker_id: usize,
) -> SinkStatisticsMetrics {
SinkStatisticsMetrics {
messages_staged: defs
.messages_staged
.get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
messages_committed: defs
.messages_committed
.get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
bytes_staged: defs
.bytes_staged
.get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
bytes_committed: defs
.bytes_committed
.get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
}
}
}
#[derive(Debug, Clone)]
pub struct SourceStatisticsMetadata {
resume_upper: Antichain<Timestamp>,
created_at: Instant,
}
impl SourceStatisticsMetadata {
pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
Self {
resume_upper,
created_at: Instant::now(),
}
}
}
#[derive(Debug)]
struct StatsInner<Stats, Metrics> {
stats: Stats,
prom: Metrics,
}
#[derive(Debug)]
pub struct StorageStatistics<Stats, Metrics, Meta> {
stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
meta: Meta,
}
impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
fn clone(&self) -> Self {
Self {
stats: Rc::clone(&self.stats),
meta: self.meta.clone(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SourceStatisticsRecord {
id: GlobalId,
worker_id: usize,
messages_received: u64,
bytes_received: u64,
updates_staged: u64,
updates_committed: u64,
records_indexed: Option<u64>,
bytes_indexed: Option<u64>,
rehydration_latency_ms: Option<Option<i64>>,
snapshot_records_known: Option<Option<u64>>,
snapshot_records_staged: Option<Option<u64>>,
snapshot_committed: Option<bool>,
offset_known: Option<Option<u64>>,
offset_committed: Option<Option<u64>>,
envelope_state_tombstones: u64,
}
impl SourceStatisticsRecord {
fn clear(&mut self) {
self.messages_received = 0;
self.bytes_received = 0;
self.updates_staged = 0;
self.updates_committed = 0;
self.rehydration_latency_ms = None;
self.snapshot_committed = None;
self.bytes_indexed = Some(0);
self.records_indexed = Some(0);
self.snapshot_records_known = Some(None);
self.snapshot_records_staged = Some(None);
self.offset_known = Some(None);
self.offset_committed = Some(None);
self.envelope_state_tombstones = 0;
}
fn reset_counters(&mut self) {
self.messages_received = 0;
self.bytes_received = 0;
self.updates_staged = 0;
self.updates_committed = 0;
}
fn as_update(&self) -> SourceStatisticsUpdate {
let SourceStatisticsRecord {
id,
worker_id: _,
messages_received,
bytes_received,
updates_staged,
updates_committed,
records_indexed,
bytes_indexed,
rehydration_latency_ms,
snapshot_records_known,
snapshot_records_staged,
snapshot_committed,
offset_known,
offset_committed,
envelope_state_tombstones: _,
} = self.clone();
SourceStatisticsUpdate {
id,
messages_received: messages_received.into(),
bytes_received: bytes_received.into(),
updates_staged: updates_staged.into(),
updates_committed: updates_committed.into(),
records_indexed: Gauge::gauge(records_indexed.unwrap()),
bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
offset_known: Gauge::gauge(offset_known.unwrap()),
offset_committed: Gauge::gauge(offset_committed.unwrap()),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SinkStatisticsRecord {
id: GlobalId,
worker_id: usize,
messages_staged: u64,
messages_committed: u64,
bytes_staged: u64,
bytes_committed: u64,
}
impl SinkStatisticsRecord {
fn clear(&mut self) {
self.messages_staged = 0;
self.messages_committed = 0;
self.bytes_staged = 0;
self.bytes_committed = 0;
}
fn reset_counters(&mut self) {
self.messages_staged = 0;
self.messages_committed = 0;
self.bytes_staged = 0;
self.bytes_committed = 0;
}
fn as_update(&self) -> SinkStatisticsUpdate {
let SinkStatisticsRecord {
id,
worker_id: _,
messages_staged,
messages_committed,
bytes_staged,
bytes_committed,
} = self.clone();
SinkStatisticsUpdate {
id,
messages_staged: messages_staged.into(),
messages_committed: messages_committed.into(),
bytes_staged: bytes_staged.into(),
bytes_committed: bytes_committed.into(),
}
}
}
pub type SourceStatistics =
StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
impl SourceStatistics {
pub(crate) fn new(
id: GlobalId,
worker_id: usize,
metrics: &SourceStatisticsMetricDefs,
parent_source_id: GlobalId,
shard_id: &mz_persist_client::ShardId,
envelope: SourceEnvelope,
resume_upper: Antichain<Timestamp>,
) -> Self {
Self {
stats: Rc::new(RefCell::new(StatsInner {
stats: SourceStatisticsRecord {
id,
worker_id,
messages_received: 0,
updates_staged: 0,
updates_committed: 0,
bytes_received: 0,
records_indexed: Some(0),
bytes_indexed: Some(0),
rehydration_latency_ms: None,
snapshot_records_staged: Some(None),
snapshot_records_known: Some(None),
snapshot_committed: None,
offset_known: Some(None),
offset_committed: Some(None),
envelope_state_tombstones: 0,
},
prom: SourceStatisticsMetrics::new(
metrics,
id,
worker_id,
parent_source_id,
shard_id,
envelope,
),
})),
meta: SourceStatisticsMetadata::new(resume_upper),
}
}
pub fn clear(&self) {
let mut cur = self.stats.borrow_mut();
cur.stats.clear();
}
pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
let mut cur = self.stats.borrow_mut();
match &cur.stats {
SourceStatisticsRecord {
records_indexed: Some(_),
bytes_indexed: Some(_),
rehydration_latency_ms: Some(_),
snapshot_records_known: Some(_),
snapshot_records_staged: Some(_),
snapshot_committed: Some(_),
offset_known: Some(_),
offset_committed: Some(_),
..
} => {
let ret = Some(cur.stats.clone());
cur.stats.reset_counters();
ret
}
_ => None,
}
}
pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
self.update_snapshot_committed(upper);
}
pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
let value = *upper != Antichain::from_elem(Timestamp::MIN);
let mut cur = self.stats.borrow_mut();
cur.stats.snapshot_committed = Some(value);
cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
}
pub fn inc_messages_received_by(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.messages_received = cur.stats.messages_received + value;
cur.prom.messages_received.inc_by(value);
}
pub fn inc_updates_staged_by(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.updates_staged = cur.stats.updates_staged + value;
cur.prom.updates_staged.inc_by(value);
}
pub fn inc_updates_committed_by(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.updates_committed = cur.stats.updates_committed + value;
cur.prom.updates_committed.inc_by(value);
}
pub fn inc_bytes_received_by(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.bytes_received = cur.stats.bytes_received + value;
cur.prom.bytes_received.inc_by(value);
}
pub fn update_bytes_indexed_by(&self, value: i64) {
let mut cur = self.stats.borrow_mut();
if let Some(updated) = cur
.stats
.bytes_indexed
.unwrap_or(0)
.checked_add_signed(value)
{
cur.stats.bytes_indexed = Some(updated);
cur.prom.bytes_indexed.set(updated);
} else {
let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
tracing::warn!(
"Unexpected u64 overflow while updating bytes_indexed value {} with {}",
bytes_indexed,
value
);
cur.stats.bytes_indexed = Some(0);
cur.prom.bytes_indexed.set(0);
}
}
pub fn set_bytes_indexed(&self, value: i64) {
let mut cur = self.stats.borrow_mut();
let value = if value < 0 {
tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
0
} else {
value.unsigned_abs()
};
cur.stats.bytes_indexed = Some(value);
cur.prom.bytes_indexed.set(value);
}
pub fn update_records_indexed_by(&self, value: i64) {
let mut cur = self.stats.borrow_mut();
if let Some(updated) = cur
.stats
.records_indexed
.unwrap_or(0)
.checked_add_signed(value)
{
cur.stats.records_indexed = Some(updated);
cur.prom.records_indexed.set(updated);
} else {
let records_indexed = cur.stats.records_indexed.unwrap_or(0);
tracing::warn!(
"Unexpected u64 overflow while updating records_indexed value {} with {}",
records_indexed,
value
);
cur.stats.records_indexed = Some(0);
cur.prom.records_indexed.set(0);
}
}
pub fn set_records_indexed(&self, value: i64) {
let mut cur = self.stats.borrow_mut();
let value = if value < 0 {
tracing::warn!("Unexpected negative value for records_indexed {}", value);
0
} else {
value.unsigned_abs()
};
cur.stats.records_indexed = Some(value);
cur.prom.records_indexed.set(value);
}
pub fn initialize_rehydration_latency_ms(&self) {
let mut cur = self.stats.borrow_mut();
cur.stats.rehydration_latency_ms = Some(None);
}
pub fn update_envelope_state_tombstones_by(&self, value: i64) {
let mut cur = self.stats.borrow_mut();
if let Some(updated) = cur
.stats
.envelope_state_tombstones
.checked_add_signed(value)
{
cur.stats.envelope_state_tombstones = updated;
cur.prom.envelope_state_tombstones.set(updated);
} else {
let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
tracing::warn!(
"Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
envelope_state_tombstones,
value
);
cur.stats.envelope_state_tombstones = 0;
cur.prom.envelope_state_tombstones.set(0);
}
}
pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
let mut cur = self.stats.borrow_mut();
if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
return; }
if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
return; }
let elapsed = self.meta.created_at.elapsed();
let value = elapsed
.as_millis()
.try_into()
.expect("Rehydration took more than ~584 million years!");
cur.stats.rehydration_latency_ms = Some(Some(value));
cur.prom.rehydration_latency_ms.set(value);
}
pub fn set_offset_known(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.offset_known = Some(Some(value));
cur.prom.offset_known.set(value);
}
pub fn set_offset_committed(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.offset_committed = Some(Some(value));
cur.prom.offset_committed.set(value);
}
pub fn set_snapshot_records_known(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.snapshot_records_known = Some(Some(value));
cur.prom.snapshot_records_known.set(value);
}
pub fn set_snapshot_records_staged(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.snapshot_records_staged = Some(Some(value));
cur.prom.snapshot_records_staged.set(value);
}
}
impl SinkStatistics {
pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
Self {
stats: Rc::new(RefCell::new(StatsInner {
stats: SinkStatisticsRecord {
id,
worker_id,
messages_staged: 0,
messages_committed: 0,
bytes_staged: 0,
bytes_committed: 0,
},
prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
})),
meta: (),
}
}
pub fn clear(&self) {
let mut cur = self.stats.borrow_mut();
cur.stats.clear()
}
pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
let mut cur = self.stats.borrow_mut();
match &cur.stats {
SinkStatisticsRecord { .. } => {
let ret = Some(cur.stats.clone());
cur.stats.reset_counters();
ret
}
}
}
pub fn inc_messages_staged_by(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.messages_staged = cur.stats.messages_staged + value;
cur.prom.messages_staged.inc_by(value);
}
pub fn inc_bytes_staged_by(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.bytes_staged = cur.stats.bytes_staged + value;
cur.prom.bytes_staged.inc_by(value);
}
pub fn inc_messages_committed_by(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.messages_committed = cur.stats.messages_committed + value;
cur.prom.messages_committed.inc_by(value);
}
pub fn inc_bytes_committed_by(&self, value: u64) {
let mut cur = self.stats.borrow_mut();
cur.stats.bytes_committed = cur.stats.bytes_committed + value;
cur.prom.bytes_committed.inc_by(value);
}
}
pub struct AggregatedStatistics {
worker_id: usize,
worker_count: usize,
local_source_statistics: BTreeMap<GlobalId, (usize, SourceStatistics)>,
local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
global_source_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SourceStatisticsUpdate>>)>,
global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
}
impl AggregatedStatistics {
pub fn new(worker_id: usize, worker_count: usize) -> Self {
AggregatedStatistics {
worker_id,
worker_count,
local_source_statistics: Default::default(),
local_sink_statistics: Default::default(),
global_source_statistics: Default::default(),
global_sink_statistics: Default::default(),
}
}
pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
self.local_source_statistics.get(id).map(|(_, s)| s)
}
pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
self.local_sink_statistics.get(id).map(|(_, s)| s)
}
pub fn deinitialize(&mut self, id: GlobalId) {
self.local_source_statistics.remove(&id);
self.local_sink_statistics.remove(&id);
self.global_source_statistics.remove(&id);
self.global_sink_statistics.remove(&id);
}
pub fn advance_global_epoch(&mut self, id: GlobalId) {
if self.worker_id == 0 {
self.global_source_statistics
.entry(id)
.and_modify(|(ref mut epoch, ref mut stats)| {
*epoch += 1;
*stats = vec![None; self.worker_count];
});
self.global_sink_statistics
.entry(id)
.and_modify(|(ref mut epoch, ref mut stats)| {
*epoch += 1;
*stats = vec![None; self.worker_count];
});
}
}
pub fn initialize_source<F: FnOnce() -> SourceStatistics>(&mut self, id: GlobalId, stats: F) {
self.local_source_statistics
.entry(id)
.and_modify(|(ref mut epoch, ref mut stats)| {
*epoch += 1;
stats.clear()
})
.or_insert_with(|| (0, stats()));
if self.worker_id == 0 {
self.global_source_statistics
.entry(id)
.or_insert_with(|| (0, vec![None; self.worker_count]));
}
}
pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
self.local_sink_statistics
.entry(id)
.and_modify(|(ref mut epoch, ref mut stats)| {
*epoch += 1;
stats.clear()
})
.or_insert_with(|| (0, stats()));
if self.worker_id == 0 {
self.global_sink_statistics
.entry(id)
.or_insert_with(|| (0, vec![None; self.worker_count]));
}
}
pub fn ingest(
&mut self,
source_statistics: Vec<(usize, SourceStatisticsRecord)>,
sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
) {
if self.worker_id == 0 {
for (epoch, stat) in source_statistics {
self.global_source_statistics.entry(stat.id).and_modify(
|(global_epoch, ref mut stats)| {
if epoch >= *global_epoch {
match &mut stats[stat.worker_id] {
None => {
stats[stat.worker_id] = Some(stat.as_update());
}
Some(occupied) => occupied.incorporate(stat.as_update()),
}
}
},
);
}
for (epoch, stat) in sink_statistics {
self.global_sink_statistics.entry(stat.id).and_modify(
|(global_epoch, ref mut stats)| {
if epoch >= *global_epoch {
match &mut stats[stat.worker_id] {
None => {
stats[stat.worker_id] = Some(stat.as_update());
}
Some(occupied) => occupied.incorporate(stat.as_update()),
}
}
},
);
}
}
}
fn _emit_local(
&mut self,
) -> (
Vec<(usize, SourceStatisticsRecord)>,
Vec<(usize, SinkStatisticsRecord)>,
) {
let sources = self
.local_source_statistics
.values_mut()
.flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
.collect();
let sinks = self
.local_sink_statistics
.values_mut()
.flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
.collect();
(sources, sinks)
}
pub fn emit_local(
&mut self,
) -> (
Vec<(usize, SourceStatisticsRecord)>,
Vec<(usize, SinkStatisticsRecord)>,
) {
if self.worker_id == 0 {
return (Vec::new(), Vec::new());
}
self._emit_local()
}
pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
if !self.worker_id == 0 {
return (Vec::new(), Vec::new());
}
let (sources, sinks) = self._emit_local();
self.ingest(sources, sinks);
let sources = self
.global_source_statistics
.iter_mut()
.filter_map(|(_, (_, s))| {
if s.iter().all(|s| s.is_some()) {
let ret = Some(SourceStatisticsUpdate::summarize(|| {
s.iter().filter_map(Option::as_ref)
}));
s.iter_mut().for_each(|s| {
if let Some(s) = s {
s.reset_counters();
}
});
ret
} else {
None
}
})
.collect();
let sinks = self
.global_sink_statistics
.iter_mut()
.filter_map(|(_, (_, s))| {
if s.iter().all(|s| s.is_some()) {
let ret = Some(SinkStatisticsUpdate::summarize(|| {
s.iter().filter_map(Option::as_ref)
}));
s.iter_mut().for_each(|s| {
if let Some(s) = s {
s.reset_counters();
}
});
ret
} else {
None
}
})
.collect();
(sources, sinks)
}
}