1use std::cell::RefCell;
26use std::collections::BTreeMap;
27use std::rc::Rc;
28use std::time::Instant;
29
30use mz_ore::metric;
31use mz_ore::metrics::{
32 DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricVisibility,
33 MetricsRegistry, UIntGaugeVec,
34};
35use mz_repr::{GlobalId, Timestamp};
36use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
37use mz_storage_types::sources::SourceEnvelope;
38use prometheus::core::{AtomicI64, AtomicU64};
39use serde::{Deserialize, Serialize};
40use timely::PartialOrder;
41use timely::progress::frontier::Antichain;
42
43#[derive(Clone, Debug)]
47pub(crate) struct SourceStatisticsMetricDefs {
48 pub(crate) messages_received: IntCounterVec,
50 pub(crate) updates_staged: IntCounterVec,
51 pub(crate) updates_committed: IntCounterVec,
52 pub(crate) bytes_received: IntCounterVec,
53
54 pub(crate) snapshot_committed: UIntGaugeVec,
56 pub(crate) bytes_indexed: UIntGaugeVec,
57 pub(crate) records_indexed: UIntGaugeVec,
58 pub(crate) rehydration_latency_ms: IntGaugeVec,
59
60 pub(crate) offset_known: UIntGaugeVec,
61 pub(crate) offset_committed: UIntGaugeVec,
62 pub(crate) snapshot_records_known: UIntGaugeVec,
63 pub(crate) snapshot_records_staged: UIntGaugeVec,
64
65 pub(crate) envelope_state_tombstones: UIntGaugeVec,
67}
68
69impl SourceStatisticsMetricDefs {
70 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71 Self {
72 snapshot_committed: registry.register(metric!(
73 name: "mz_source_snapshot_committed",
74 help: "Whether or not the worker has committed the initial snapshot for a source.",
75 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
76 )),
77 messages_received: registry.register(metric!(
78 name: "mz_source_messages_received",
79 help: "The number of raw messages the worker has received from upstream.",
80 var_labels: ["source_id", "worker_id", "parent_source_id"],
81 visibility: MetricVisibility::Public,
82 )),
83 updates_staged: registry.register(metric!(
84 name: "mz_source_updates_staged",
85 help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
86 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
87 )),
88 updates_committed: registry.register(metric!(
89 name: "mz_source_updates_committed",
90 help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
91 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
92 )),
93 bytes_received: registry.register(metric!(
94 name: "mz_source_bytes_received",
95 help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
96 var_labels: ["source_id", "worker_id", "parent_source_id"],
97 visibility: MetricVisibility::Public,
98 )),
99 bytes_indexed: registry.register(metric!(
100 name: "mz_source_bytes_indexed",
101 help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
102 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
103 )),
104 records_indexed: registry.register(metric!(
105 name: "mz_source_records_indexed",
106 help: "The number of records in the source envelope state. This will be specific to the envelope in use",
107 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
108 )),
109 envelope_state_tombstones: registry.register(metric!(
110 name: "mz_source_envelope_state_tombstones",
111 help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
112 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
113 )),
114 rehydration_latency_ms: registry.register(metric!(
115 name: "mz_source_rehydration_latency_ms",
116 help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
117 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
118 )),
119 offset_known: registry.register(metric!(
120 name: "mz_source_offset_known",
121 help: "The total number of _values_ (source-defined unit) present in upstream.",
122 var_labels: ["source_id", "worker_id", "shard_id"],
123 visibility: MetricVisibility::Public,
124 )),
125 offset_committed: registry.register(metric!(
126 name: "mz_source_offset_committed",
127 help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
128 var_labels: ["source_id", "worker_id", "shard_id"],
129 visibility: MetricVisibility::Public,
130 )),
131 snapshot_records_known: registry.register(metric!(
132 name: "mz_source_snapshot_records_known",
133 help: "The total number of records in the source's snapshot",
134 var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
135 )),
136 snapshot_records_staged: registry.register(metric!(
137 name: "mz_source_snapshot_records_staged",
138 help: "The total number of records read from the source's snapshot",
139 var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
140 )),
141 }
142 }
143}
144
145#[derive(Debug)]
147pub struct SourceStatisticsMetrics {
148 pub(crate) messages_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
150 pub(crate) updates_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
151 pub(crate) updates_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
152 pub(crate) bytes_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153
154 pub(crate) snapshot_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
156 pub(crate) bytes_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
157 pub(crate) records_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
158 pub(crate) rehydration_latency_ms: DeleteOnDropGauge<AtomicI64, Vec<String>>,
159
160 pub(crate) offset_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
161 pub(crate) offset_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
162 pub(crate) snapshot_records_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
163 pub(crate) snapshot_records_staged: DeleteOnDropGauge<AtomicU64, Vec<String>>,
164
165 pub(crate) envelope_state_tombstones: DeleteOnDropGauge<AtomicU64, Vec<String>>,
167}
168
169impl SourceStatisticsMetrics {
170 pub(crate) fn new(
171 defs: &SourceStatisticsMetricDefs,
172 id: GlobalId,
173 worker_id: usize,
174 parent_source_id: GlobalId,
175 shard_id: &mz_persist_client::ShardId,
176 envelope: SourceEnvelope,
177 ) -> SourceStatisticsMetrics {
178 let shard = shard_id.to_string();
179 let envelope = match envelope {
180 SourceEnvelope::None(_) => "none",
181 SourceEnvelope::Upsert(_) => "upsert",
182 SourceEnvelope::CdcV2 => "cdcv2",
183 };
184
185 SourceStatisticsMetrics {
186 snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
187 id.to_string(),
188 worker_id.to_string(),
189 parent_source_id.to_string(),
190 shard.clone(),
191 ]),
192 messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
193 id.to_string(),
194 worker_id.to_string(),
195 parent_source_id.to_string(),
196 ]),
197 updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
198 id.to_string(),
199 worker_id.to_string(),
200 parent_source_id.to_string(),
201 shard.clone(),
202 ]),
203 updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
204 id.to_string(),
205 worker_id.to_string(),
206 parent_source_id.to_string(),
207 shard.clone(),
208 ]),
209 bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
210 id.to_string(),
211 worker_id.to_string(),
212 parent_source_id.to_string(),
213 ]),
214 bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
215 id.to_string(),
216 worker_id.to_string(),
217 parent_source_id.to_string(),
218 shard.clone(),
219 ]),
220 records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
221 id.to_string(),
222 worker_id.to_string(),
223 parent_source_id.to_string(),
224 shard.clone(),
225 ]),
226 envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
227 vec![
228 id.to_string(),
229 worker_id.to_string(),
230 parent_source_id.to_string(),
231 shard.clone(),
232 ],
233 ),
234 rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
235 id.to_string(),
236 worker_id.to_string(),
237 parent_source_id.to_string(),
238 shard.clone(),
239 envelope.to_string(),
240 ]),
241 offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
242 id.to_string(),
243 worker_id.to_string(),
244 shard.clone(),
245 ]),
246 offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
247 id.to_string(),
248 worker_id.to_string(),
249 shard.clone(),
250 ]),
251 snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
252 id.to_string(),
253 worker_id.to_string(),
254 shard.clone(),
255 parent_source_id.to_string(),
256 ]),
257 snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
258 id.to_string(),
259 worker_id.to_string(),
260 shard.clone(),
261 parent_source_id.to_string(),
262 ]),
263 }
264 }
265}
266
267#[derive(Clone, Debug)]
268pub(crate) struct SinkStatisticsMetricDefs {
269 pub(crate) messages_staged: IntCounterVec,
271 pub(crate) messages_committed: IntCounterVec,
272 pub(crate) bytes_staged: IntCounterVec,
273 pub(crate) bytes_committed: IntCounterVec,
274}
275
276impl SinkStatisticsMetricDefs {
277 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
278 Self {
279 messages_staged: registry.register(metric!(
280 name: "mz_sink_messages_staged",
281 help: "The number of messages staged but possibly not committed to the sink.",
282 var_labels: ["sink_id", "worker_id"],
283 )),
284 messages_committed: registry.register(metric!(
285 name: "mz_sink_messages_committed",
286 help: "The number of messages committed to the sink.",
287 var_labels: ["sink_id", "worker_id"],
288 )),
289 bytes_staged: registry.register(metric!(
290 name: "mz_sink_bytes_staged",
291 help: "The number of bytes staged but possibly not committed to the sink.",
292 var_labels: ["sink_id", "worker_id"],
293 visibility: MetricVisibility::Public,
294 )),
295 bytes_committed: registry.register(metric!(
296 name: "mz_sink_bytes_committed",
297 help: "The number of bytes committed to the sink.",
298 var_labels: ["sink_id", "worker_id"],
299 visibility: MetricVisibility::Public,
300 )),
301 }
302 }
303}
304
305#[derive(Debug)]
307pub struct SinkStatisticsMetrics {
308 pub(crate) messages_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
310 pub(crate) messages_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
311 pub(crate) bytes_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
312 pub(crate) bytes_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
313}
314
315impl SinkStatisticsMetrics {
316 pub(crate) fn new(
317 defs: &SinkStatisticsMetricDefs,
318 id: GlobalId,
319 worker_id: usize,
320 ) -> SinkStatisticsMetrics {
321 SinkStatisticsMetrics {
322 messages_staged: defs
323 .messages_staged
324 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
325 messages_committed: defs
326 .messages_committed
327 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
328 bytes_staged: defs
329 .bytes_staged
330 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
331 bytes_committed: defs
332 .bytes_committed
333 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
334 }
335 }
336}
337
338#[derive(Debug, Clone)]
340pub struct SourceStatisticsMetadata {
341 resume_upper: Antichain<Timestamp>,
343 created_at: Instant,
345}
346
347impl SourceStatisticsMetadata {
348 pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
350 Self {
351 resume_upper,
352 created_at: Instant::now(),
353 }
354 }
355}
356
357#[derive(Debug)]
358struct StatsInner<Stats, Metrics> {
359 stats: Stats,
360 prom: Metrics,
361}
362
363#[derive(Debug)]
375pub struct StorageStatistics<Stats, Metrics, Meta> {
376 stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
380 meta: Meta,
382}
383
384impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
385 fn clone(&self) -> Self {
386 Self {
387 stats: Rc::clone(&self.stats),
388 meta: self.meta.clone(),
389 }
390 }
391}
392
393#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
395pub struct SourceStatisticsRecord {
396 id: GlobalId,
397 worker_id: usize,
398 messages_received: u64,
400 bytes_received: u64,
401 updates_staged: u64,
402 updates_committed: u64,
403
404 records_indexed: Option<u64>,
407 bytes_indexed: Option<u64>,
408
409 rehydration_latency_ms: Option<Option<i64>>,
411
412 snapshot_records_known: Option<Option<u64>>,
415 snapshot_records_staged: Option<Option<u64>>,
416 snapshot_committed: Option<bool>,
417 offset_known: Option<Option<u64>>,
418 offset_committed: Option<Option<u64>>,
419
420 envelope_state_tombstones: u64,
422}
423
424impl SourceStatisticsRecord {
425 fn reset_gauges(&mut self) {
426 self.rehydration_latency_ms = None;
429 self.snapshot_committed = None;
430
431 self.bytes_indexed = Some(0);
433 self.records_indexed = Some(0);
434
435 self.snapshot_records_known = Some(None);
437 self.snapshot_records_staged = Some(None);
438 self.offset_known = Some(None);
439 self.offset_committed = Some(None);
440
441 self.envelope_state_tombstones = 0;
442 }
443
444 fn reset_counters(&mut self) {
446 self.messages_received = 0;
447 self.bytes_received = 0;
448 self.updates_staged = 0;
449 self.updates_committed = 0;
450 }
451
452 fn as_update(&self) -> SourceStatisticsUpdate {
455 let SourceStatisticsRecord {
456 id,
457 worker_id: _,
458 messages_received,
459 bytes_received,
460 updates_staged,
461 updates_committed,
462 records_indexed,
463 bytes_indexed,
464 rehydration_latency_ms,
465 snapshot_records_known,
466 snapshot_records_staged,
467 snapshot_committed,
468 offset_known,
469 offset_committed,
470 envelope_state_tombstones: _,
471 } = self.clone();
472
473 SourceStatisticsUpdate {
474 id,
475 messages_received: messages_received.into(),
476 bytes_received: bytes_received.into(),
477 updates_staged: updates_staged.into(),
478 updates_committed: updates_committed.into(),
479 records_indexed: Gauge::gauge(records_indexed.unwrap()),
480 bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
481 rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
482 snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
483 snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
484 snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
485 offset_known: Gauge::gauge(offset_known.unwrap()),
486 offset_committed: Gauge::gauge(offset_committed.unwrap()),
487 }
488 }
489}
490
491#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
493pub struct SinkStatisticsRecord {
494 id: GlobalId,
495 worker_id: usize,
496 messages_staged: u64,
498 messages_committed: u64,
499 bytes_staged: u64,
500 bytes_committed: u64,
501}
502
503impl SinkStatisticsRecord {
504 fn reset_gauges(&self) {}
505
506 fn reset_counters(&mut self) {
508 self.messages_staged = 0;
509 self.messages_committed = 0;
510 self.bytes_staged = 0;
511 self.bytes_committed = 0;
512 }
513
514 fn as_update(&self) -> SinkStatisticsUpdate {
517 let SinkStatisticsRecord {
518 id,
519 worker_id: _,
520 messages_staged,
521 messages_committed,
522 bytes_staged,
523 bytes_committed,
524 } = self.clone();
525
526 SinkStatisticsUpdate {
527 id,
528 messages_staged: messages_staged.into(),
529 messages_committed: messages_committed.into(),
530 bytes_staged: bytes_staged.into(),
531 bytes_committed: bytes_committed.into(),
532 }
533 }
534}
535
536pub type SourceStatistics =
538 StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
539
540pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
542
543impl SourceStatistics {
544 pub(crate) fn new(
545 id: GlobalId,
546 worker_id: usize,
547 metrics: &SourceStatisticsMetricDefs,
548 parent_source_id: GlobalId,
549 shard_id: &mz_persist_client::ShardId,
550 envelope: SourceEnvelope,
551 resume_upper: Antichain<Timestamp>,
552 ) -> Self {
553 Self {
554 stats: Rc::new(RefCell::new(StatsInner {
555 stats: SourceStatisticsRecord {
556 id,
557 worker_id,
558 messages_received: 0,
559 updates_staged: 0,
560 updates_committed: 0,
561 bytes_received: 0,
562 records_indexed: Some(0),
563 bytes_indexed: Some(0),
564 rehydration_latency_ms: None,
565 snapshot_records_staged: Some(None),
566 snapshot_records_known: Some(None),
567 snapshot_committed: None,
568 offset_known: Some(None),
569 offset_committed: Some(None),
570 envelope_state_tombstones: 0,
571 },
572 prom: SourceStatisticsMetrics::new(
573 metrics,
574 id,
575 worker_id,
576 parent_source_id,
577 shard_id,
578 envelope,
579 ),
580 })),
581 meta: SourceStatisticsMetadata::new(resume_upper),
582 }
583 }
584
585 pub fn reset_gauges(&self) {
588 let mut cur = self.stats.borrow_mut();
589 cur.stats.reset_gauges();
590 }
591
592 pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
596 let mut cur = self.stats.borrow_mut();
597
598 match &cur.stats {
599 SourceStatisticsRecord {
600 records_indexed: Some(_),
601 bytes_indexed: Some(_),
602 rehydration_latency_ms: Some(_),
603 snapshot_records_known: Some(_),
604 snapshot_records_staged: Some(_),
605 snapshot_committed: Some(_),
606 offset_known: Some(_),
607 offset_committed: Some(_),
608 ..
609 } => {
610 let ret = Some(cur.stats.clone());
611 cur.stats.reset_counters();
612 ret
613 }
614 _ => None,
615 }
616 }
617
618 pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
625 self.update_snapshot_committed(upper);
626 }
627
628 pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
630 let value = *upper != Antichain::from_elem(Timestamp::MIN);
631 let mut cur = self.stats.borrow_mut();
632 cur.stats.snapshot_committed = Some(value);
633 cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
634 }
635
636 pub fn inc_messages_received_by(&self, value: u64) {
638 let mut cur = self.stats.borrow_mut();
639 cur.stats.messages_received = cur.stats.messages_received + value;
640 cur.prom.messages_received.inc_by(value);
641 }
642
643 pub fn inc_updates_staged_by(&self, value: u64) {
645 let mut cur = self.stats.borrow_mut();
646 cur.stats.updates_staged = cur.stats.updates_staged + value;
647 cur.prom.updates_staged.inc_by(value);
648 }
649
650 pub fn inc_updates_committed_by(&self, value: u64) {
652 let mut cur = self.stats.borrow_mut();
653 cur.stats.updates_committed = cur.stats.updates_committed + value;
654 cur.prom.updates_committed.inc_by(value);
655 }
656
657 pub fn inc_bytes_received_by(&self, value: u64) {
659 let mut cur = self.stats.borrow_mut();
660 cur.stats.bytes_received = cur.stats.bytes_received + value;
661 cur.prom.bytes_received.inc_by(value);
662 }
663
664 pub fn update_bytes_indexed_by(&self, value: i64) {
667 let mut cur = self.stats.borrow_mut();
668 if let Some(updated) = cur
669 .stats
670 .bytes_indexed
671 .unwrap_or(0)
672 .checked_add_signed(value)
673 {
674 cur.stats.bytes_indexed = Some(updated);
675 cur.prom.bytes_indexed.set(updated);
676 } else {
677 let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
678 tracing::warn!(
679 "Unexpected u64 overflow while updating bytes_indexed value {} with {}",
680 bytes_indexed,
681 value
682 );
683 cur.stats.bytes_indexed = Some(0);
684 cur.prom.bytes_indexed.set(0);
685 }
686 }
687
688 pub fn set_bytes_indexed(&self, value: i64) {
690 let mut cur = self.stats.borrow_mut();
691 let value = if value < 0 {
692 tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
693 0
694 } else {
695 value.unsigned_abs()
696 };
697 cur.stats.bytes_indexed = Some(value);
698 cur.prom.bytes_indexed.set(value);
699 }
700
701 pub fn update_records_indexed_by(&self, value: i64) {
704 let mut cur = self.stats.borrow_mut();
705 if let Some(updated) = cur
706 .stats
707 .records_indexed
708 .unwrap_or(0)
709 .checked_add_signed(value)
710 {
711 cur.stats.records_indexed = Some(updated);
712 cur.prom.records_indexed.set(updated);
713 } else {
714 let records_indexed = cur.stats.records_indexed.unwrap_or(0);
715 tracing::warn!(
716 "Unexpected u64 overflow while updating records_indexed value {} with {}",
717 records_indexed,
718 value
719 );
720 cur.stats.records_indexed = Some(0);
721 cur.prom.records_indexed.set(0);
722 }
723 }
724
725 pub fn set_records_indexed(&self, value: i64) {
727 let mut cur = self.stats.borrow_mut();
728 let value = if value < 0 {
729 tracing::warn!("Unexpected negative value for records_indexed {}", value);
730 0
731 } else {
732 value.unsigned_abs()
733 };
734 cur.stats.records_indexed = Some(value);
735 cur.prom.records_indexed.set(value);
736 }
737
738 pub fn initialize_rehydration_latency_ms(&self) {
740 let mut cur = self.stats.borrow_mut();
741 cur.stats.rehydration_latency_ms = Some(None);
742 }
743
744 pub fn update_envelope_state_tombstones_by(&self, value: i64) {
748 let mut cur = self.stats.borrow_mut();
749 if let Some(updated) = cur
750 .stats
751 .envelope_state_tombstones
752 .checked_add_signed(value)
753 {
754 cur.stats.envelope_state_tombstones = updated;
755 cur.prom.envelope_state_tombstones.set(updated);
756 } else {
757 let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
758 tracing::warn!(
759 "Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
760 envelope_state_tombstones,
761 value
762 );
763 cur.stats.envelope_state_tombstones = 0;
764 cur.prom.envelope_state_tombstones.set(0);
765 }
766 }
767
768 pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
770 let mut cur = self.stats.borrow_mut();
771
772 if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
773 return; }
775 if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
776 return; }
778
779 let elapsed = self.meta.created_at.elapsed();
780 let value = elapsed
781 .as_millis()
782 .try_into()
783 .expect("Rehydration took more than ~584 million years!");
784 cur.stats.rehydration_latency_ms = Some(Some(value));
785 cur.prom.rehydration_latency_ms.set(value);
786 }
787
788 pub fn set_offset_known(&self, value: u64) {
790 let mut cur = self.stats.borrow_mut();
791 cur.stats.offset_known = Some(Some(value));
792 cur.prom.offset_known.set(value);
793 }
794
795 pub fn set_offset_committed(&self, value: u64) {
797 let mut cur = self.stats.borrow_mut();
798 cur.stats.offset_committed = Some(Some(value));
799 cur.prom.offset_committed.set(value);
800 }
801
802 pub fn set_snapshot_records_known(&self, value: u64) {
804 let mut cur = self.stats.borrow_mut();
805 cur.stats.snapshot_records_known = Some(Some(value));
806 cur.prom.snapshot_records_known.set(value);
807 }
808
809 pub fn set_snapshot_records_staged(&self, value: u64) {
811 let mut cur = self.stats.borrow_mut();
812 cur.stats.snapshot_records_staged = Some(Some(value));
813 cur.prom.snapshot_records_staged.set(value);
814 }
815}
816
817impl SinkStatistics {
818 pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
819 Self {
820 stats: Rc::new(RefCell::new(StatsInner {
821 stats: SinkStatisticsRecord {
822 id,
823 worker_id,
824 messages_staged: 0,
825 messages_committed: 0,
826 bytes_staged: 0,
827 bytes_committed: 0,
828 },
829 prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
830 })),
831 meta: (),
832 }
833 }
834
835 pub fn reset_gauges(&self) {
838 let cur = self.stats.borrow_mut();
839 cur.stats.reset_gauges();
840 }
841
842 pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
846 let mut cur = self.stats.borrow_mut();
847
848 match &cur.stats {
849 SinkStatisticsRecord { .. } => {
850 let ret = Some(cur.stats.clone());
851 cur.stats.reset_counters();
852 ret
853 }
854 }
855 }
856
857 pub fn inc_messages_staged_by(&self, value: u64) {
859 let mut cur = self.stats.borrow_mut();
860 cur.stats.messages_staged = cur.stats.messages_staged + value;
861 cur.prom.messages_staged.inc_by(value);
862 }
863
864 pub fn inc_bytes_staged_by(&self, value: u64) {
866 let mut cur = self.stats.borrow_mut();
867 cur.stats.bytes_staged = cur.stats.bytes_staged + value;
868 cur.prom.bytes_staged.inc_by(value);
869 }
870
871 pub fn inc_messages_committed_by(&self, value: u64) {
873 let mut cur = self.stats.borrow_mut();
874 cur.stats.messages_committed = cur.stats.messages_committed + value;
875 cur.prom.messages_committed.inc_by(value);
876 }
877
878 pub fn inc_bytes_committed_by(&self, value: u64) {
880 let mut cur = self.stats.borrow_mut();
881 cur.stats.bytes_committed = cur.stats.bytes_committed + value;
882 cur.prom.bytes_committed.inc_by(value);
883 }
884}
885
886pub struct AggregatedStatistics {
911 worker_id: usize,
912 worker_count: usize,
913 local_source_statistics: BTreeMap<GlobalId, (usize, GlobalId, SourceStatistics)>,
914 local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
915
916 global_source_statistics:
917 BTreeMap<GlobalId, (usize, GlobalId, Vec<Option<SourceStatisticsUpdate>>)>,
918 global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
919}
920
921impl AggregatedStatistics {
922 pub fn new(worker_id: usize, worker_count: usize) -> Self {
924 AggregatedStatistics {
925 worker_id,
926 worker_count,
927 local_source_statistics: Default::default(),
928 local_sink_statistics: Default::default(),
929 global_source_statistics: Default::default(),
930 global_sink_statistics: Default::default(),
931 }
932 }
933
934 pub fn get_ingestion_stats(
936 &self,
937 ingestion_id: &GlobalId,
938 ) -> BTreeMap<GlobalId, SourceStatistics> {
939 let mut ingestion_stats = BTreeMap::new();
940 for (id, (_epoch, ingestion_id2, stats)) in self.local_source_statistics.iter() {
941 if ingestion_id == ingestion_id2 {
942 ingestion_stats.insert(id.clone(), stats.clone());
943 }
944 }
945 ingestion_stats
946 }
947
948 pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
950 self.local_source_statistics.get(id).map(|(_, _, s)| s)
951 }
952
953 pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
955 self.local_sink_statistics.get(id).map(|(_, s)| s)
956 }
957
958 pub fn deinitialize(&mut self, id: GlobalId) {
961 self.local_source_statistics.remove(&id);
962 self.local_sink_statistics.remove(&id);
963 self.global_source_statistics.remove(&id);
964 self.global_sink_statistics.remove(&id);
965 }
966
967 pub fn advance_global_epoch(&mut self, id: GlobalId) {
972 if let Some((epoch, _ingestion_id, stats)) = self.global_source_statistics.get_mut(&id) {
973 *epoch += 1;
974 for worker_stats in stats {
975 if let Some(update) = worker_stats {
976 update.reset_gauges();
977 }
978 }
979 }
980 if let Some((epoch, stats)) = self.global_sink_statistics.get_mut(&id) {
981 *epoch += 1;
982 for worker_stats in stats {
983 if let Some(update) = worker_stats {
984 update.reset_gauges();
985 }
986 }
987 }
988 }
989
990 pub fn initialize_source<F: FnOnce() -> SourceStatistics>(
992 &mut self,
993 id: GlobalId,
994 ingestion_id: GlobalId,
995 resume_upper: Antichain<Timestamp>,
996 stats: F,
997 ) {
998 self.local_source_statistics
999 .entry(id)
1000 .and_modify(|(epoch, ingestion_id2, stats)| {
1001 assert_eq!(ingestion_id, *ingestion_id2);
1002 *epoch += 1;
1003 stats.reset_gauges();
1004 stats.meta = SourceStatisticsMetadata::new(resume_upper);
1005 })
1006 .or_insert_with(|| (0, ingestion_id, stats()));
1007
1008 if self.worker_id == 0 {
1009 self.global_source_statistics
1010 .entry(id)
1011 .or_insert_with(|| (0, ingestion_id, vec![None; self.worker_count]));
1012 }
1013 }
1014
1015 pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
1017 self.local_sink_statistics
1018 .entry(id)
1019 .and_modify(|(epoch, stats)| {
1020 *epoch += 1;
1021 stats.reset_gauges();
1022 })
1023 .or_insert_with(|| (0, stats()));
1024 if self.worker_id == 0 {
1025 self.global_sink_statistics
1026 .entry(id)
1027 .or_insert_with(|| (0, vec![None; self.worker_count]));
1028 }
1029 }
1030
1031 pub fn ingest(
1033 &mut self,
1034 source_statistics: Vec<(usize, SourceStatisticsRecord)>,
1035 sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
1036 ) {
1037 if self.worker_id != 0 {
1039 return;
1040 }
1041
1042 for (epoch, stat) in source_statistics {
1043 if let Some((global_epoch, _, stats)) = self.global_source_statistics.get_mut(&stat.id)
1044 {
1045 let epoch_match = epoch >= *global_epoch;
1048 let mut update = stat.as_update();
1049 match (&mut stats[stat.worker_id], epoch_match) {
1050 (None, true) => stats[stat.worker_id] = Some(update),
1051 (None, false) => {
1052 update.reset_gauges();
1053 stats[stat.worker_id] = Some(update);
1054 }
1055 (Some(occupied), true) => occupied.incorporate(update),
1056 (Some(occupied), false) => occupied.incorporate_counters(update),
1057 }
1058 }
1059 }
1060
1061 for (epoch, stat) in sink_statistics {
1062 if let Some((global_epoch, stats)) = self.global_sink_statistics.get_mut(&stat.id) {
1063 let epoch_match = epoch >= *global_epoch;
1066 let update = stat.as_update();
1067 match (&mut stats[stat.worker_id], epoch_match) {
1068 (None, true) => stats[stat.worker_id] = Some(update),
1069 (None, false) => {
1070 update.reset_gauges();
1071 stats[stat.worker_id] = Some(update);
1072 }
1073 (Some(occupied), true) => occupied.incorporate(update),
1074 (Some(occupied), false) => occupied.incorporate_counters(update),
1075 }
1076 }
1077 }
1078 }
1079
1080 fn _emit_local(
1081 &mut self,
1082 ) -> (
1083 Vec<(usize, SourceStatisticsRecord)>,
1084 Vec<(usize, SinkStatisticsRecord)>,
1085 ) {
1086 let sources = self
1087 .local_source_statistics
1088 .values_mut()
1089 .flat_map(|(epoch, _, s)| s.snapshot().map(|v| (*epoch, v)))
1090 .collect();
1091
1092 let sinks = self
1093 .local_sink_statistics
1094 .values_mut()
1095 .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1096 .collect();
1097
1098 (sources, sinks)
1099 }
1100
1101 pub fn emit_local(
1103 &mut self,
1104 ) -> (
1105 Vec<(usize, SourceStatisticsRecord)>,
1106 Vec<(usize, SinkStatisticsRecord)>,
1107 ) {
1108 if self.worker_id == 0 {
1111 return (Vec::new(), Vec::new());
1112 }
1113
1114 self._emit_local()
1115 }
1116
1117 pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
1120 if !self.worker_id == 0 {
1121 return (Vec::new(), Vec::new());
1122 }
1123
1124 let (sources, sinks) = self._emit_local();
1125 self.ingest(sources, sinks);
1126
1127 let sources = self
1128 .global_source_statistics
1129 .iter_mut()
1130 .filter_map(|(_, (_, _, s))| {
1131 if s.iter().all(|s| s.is_some()) {
1132 let ret = Some(SourceStatisticsUpdate::summarize(|| {
1133 s.iter().filter_map(Option::as_ref)
1134 }));
1135
1136 s.iter_mut().for_each(|s| {
1138 if let Some(s) = s {
1139 s.reset_counters();
1140 }
1141 });
1142 ret
1143 } else {
1144 None
1145 }
1146 })
1147 .collect();
1148
1149 let sinks = self
1150 .global_sink_statistics
1151 .iter_mut()
1152 .filter_map(|(_, (_, s))| {
1153 if s.iter().all(|s| s.is_some()) {
1154 let ret = Some(SinkStatisticsUpdate::summarize(|| {
1155 s.iter().filter_map(Option::as_ref)
1156 }));
1157
1158 s.iter_mut().for_each(|s| {
1160 if let Some(s) = s {
1161 s.reset_counters();
1162 }
1163 });
1164 ret
1165 } else {
1166 None
1167 }
1168 })
1169 .collect();
1170
1171 (sources, sinks)
1172 }
1173}