1use std::cell::RefCell;
26use std::collections::BTreeMap;
27use std::rc::Rc;
28use std::time::Instant;
29
30use mz_ore::metric;
31use mz_ore::metrics::{
32 DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricsRegistry,
33 UIntGaugeVec,
34};
35use mz_repr::{GlobalId, Timestamp};
36use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
37use mz_storage_types::sources::SourceEnvelope;
38use prometheus::core::{AtomicI64, AtomicU64};
39use serde::{Deserialize, Serialize};
40use timely::PartialOrder;
41use timely::progress::frontier::Antichain;
42
43#[derive(Clone, Debug)]
47pub(crate) struct SourceStatisticsMetricDefs {
48 pub(crate) messages_received: IntCounterVec,
50 pub(crate) updates_staged: IntCounterVec,
51 pub(crate) updates_committed: IntCounterVec,
52 pub(crate) bytes_received: IntCounterVec,
53
54 pub(crate) snapshot_committed: UIntGaugeVec,
56 pub(crate) bytes_indexed: UIntGaugeVec,
57 pub(crate) records_indexed: UIntGaugeVec,
58 pub(crate) rehydration_latency_ms: IntGaugeVec,
59
60 pub(crate) offset_known: UIntGaugeVec,
61 pub(crate) offset_committed: UIntGaugeVec,
62 pub(crate) snapshot_records_known: UIntGaugeVec,
63 pub(crate) snapshot_records_staged: UIntGaugeVec,
64
65 pub(crate) envelope_state_tombstones: UIntGaugeVec,
67}
68
69impl SourceStatisticsMetricDefs {
70 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71 Self {
72 snapshot_committed: registry.register(metric!(
73 name: "mz_source_snapshot_committed",
74 help: "Whether or not the worker has committed the initial snapshot for a source.",
75 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
76 )),
77 messages_received: registry.register(metric!(
78 name: "mz_source_messages_received",
79 help: "The number of raw messages the worker has received from upstream.",
80 var_labels: ["source_id", "worker_id", "parent_source_id"],
81 )),
82 updates_staged: registry.register(metric!(
83 name: "mz_source_updates_staged",
84 help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
85 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
86 )),
87 updates_committed: registry.register(metric!(
88 name: "mz_source_updates_committed",
89 help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
90 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
91 )),
92 bytes_received: registry.register(metric!(
93 name: "mz_source_bytes_received",
94 help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
95 var_labels: ["source_id", "worker_id", "parent_source_id"],
96 )),
97 bytes_indexed: registry.register(metric!(
98 name: "mz_source_bytes_indexed",
99 help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
100 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
101 )),
102 records_indexed: registry.register(metric!(
103 name: "mz_source_records_indexed",
104 help: "The number of records in the source envelope state. This will be specific to the envelope in use",
105 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
106 )),
107 envelope_state_tombstones: registry.register(metric!(
108 name: "mz_source_envelope_state_tombstones",
109 help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
110 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
111 )),
112 rehydration_latency_ms: registry.register(metric!(
113 name: "mz_source_rehydration_latency_ms",
114 help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
115 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
116 )),
117 offset_known: registry.register(metric!(
118 name: "mz_source_offset_known",
119 help: "The total number of _values_ (source-defined unit) present in upstream.",
120 var_labels: ["source_id", "worker_id", "shard_id"],
121 )),
122 offset_committed: registry.register(metric!(
123 name: "mz_source_offset_committed",
124 help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
125 var_labels: ["source_id", "worker_id", "shard_id"],
126 )),
127 snapshot_records_known: registry.register(metric!(
128 name: "mz_source_snapshot_records_known",
129 help: "The total number of records in the source's snapshot",
130 var_labels: ["source_id", "worker_id", "shard_id"],
131 )),
132 snapshot_records_staged: registry.register(metric!(
133 name: "mz_source_snapshot_records_staged",
134 help: "The total number of records read from the source's snapshot",
135 var_labels: ["source_id", "worker_id", "shard_id"],
136 )),
137 }
138 }
139}
140
141#[derive(Debug)]
143pub struct SourceStatisticsMetrics {
144 pub(crate) messages_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
146 pub(crate) updates_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
147 pub(crate) updates_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
148 pub(crate) bytes_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
149
150 pub(crate) snapshot_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
152 pub(crate) bytes_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
153 pub(crate) records_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
154 pub(crate) rehydration_latency_ms: DeleteOnDropGauge<AtomicI64, Vec<String>>,
155
156 pub(crate) offset_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
157 pub(crate) offset_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
158 pub(crate) snapshot_records_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
159 pub(crate) snapshot_records_staged: DeleteOnDropGauge<AtomicU64, Vec<String>>,
160
161 pub(crate) envelope_state_tombstones: DeleteOnDropGauge<AtomicU64, Vec<String>>,
163}
164
165impl SourceStatisticsMetrics {
166 pub(crate) fn new(
167 defs: &SourceStatisticsMetricDefs,
168 id: GlobalId,
169 worker_id: usize,
170 parent_source_id: GlobalId,
171 shard_id: &mz_persist_client::ShardId,
172 envelope: SourceEnvelope,
173 ) -> SourceStatisticsMetrics {
174 let shard = shard_id.to_string();
175 let envelope = match envelope {
176 SourceEnvelope::None(_) => "none",
177 SourceEnvelope::Upsert(_) => "upsert",
178 SourceEnvelope::CdcV2 => "cdcv2",
179 };
180
181 SourceStatisticsMetrics {
182 snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
183 id.to_string(),
184 worker_id.to_string(),
185 parent_source_id.to_string(),
186 shard.clone(),
187 ]),
188 messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
189 id.to_string(),
190 worker_id.to_string(),
191 parent_source_id.to_string(),
192 ]),
193 updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
194 id.to_string(),
195 worker_id.to_string(),
196 parent_source_id.to_string(),
197 shard.clone(),
198 ]),
199 updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
200 id.to_string(),
201 worker_id.to_string(),
202 parent_source_id.to_string(),
203 shard.clone(),
204 ]),
205 bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
206 id.to_string(),
207 worker_id.to_string(),
208 parent_source_id.to_string(),
209 ]),
210 bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
211 id.to_string(),
212 worker_id.to_string(),
213 parent_source_id.to_string(),
214 shard.clone(),
215 ]),
216 records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
217 id.to_string(),
218 worker_id.to_string(),
219 parent_source_id.to_string(),
220 shard.clone(),
221 ]),
222 envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
223 vec![
224 id.to_string(),
225 worker_id.to_string(),
226 parent_source_id.to_string(),
227 shard.clone(),
228 ],
229 ),
230 rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
231 id.to_string(),
232 worker_id.to_string(),
233 parent_source_id.to_string(),
234 shard.clone(),
235 envelope.to_string(),
236 ]),
237 offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
238 id.to_string(),
239 worker_id.to_string(),
240 parent_source_id.to_string(),
241 ]),
242 offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
243 id.to_string(),
244 worker_id.to_string(),
245 parent_source_id.to_string(),
246 ]),
247 snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
248 id.to_string(),
249 worker_id.to_string(),
250 parent_source_id.to_string(),
251 ]),
252 snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
253 id.to_string(),
254 worker_id.to_string(),
255 parent_source_id.to_string(),
256 ]),
257 }
258 }
259}
260
261#[derive(Clone, Debug)]
262pub(crate) struct SinkStatisticsMetricDefs {
263 pub(crate) messages_staged: IntCounterVec,
265 pub(crate) messages_committed: IntCounterVec,
266 pub(crate) bytes_staged: IntCounterVec,
267 pub(crate) bytes_committed: IntCounterVec,
268}
269
270impl SinkStatisticsMetricDefs {
271 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
272 Self {
273 messages_staged: registry.register(metric!(
274 name: "mz_sink_messages_staged",
275 help: "The number of messages staged but possibly not committed to the sink.",
276 var_labels: ["sink_id", "worker_id"],
277 )),
278 messages_committed: registry.register(metric!(
279 name: "mz_sink_messages_committed",
280 help: "The number of messages committed to the sink.",
281 var_labels: ["sink_id", "worker_id"],
282 )),
283 bytes_staged: registry.register(metric!(
284 name: "mz_sink_bytes_staged",
285 help: "The number of bytes staged but possibly not committed to the sink.",
286 var_labels: ["sink_id", "worker_id"],
287 )),
288 bytes_committed: registry.register(metric!(
289 name: "mz_sink_bytes_committed",
290 help: "The number of bytes committed to the sink.",
291 var_labels: ["sink_id", "worker_id"],
292 )),
293 }
294 }
295}
296
297#[derive(Debug)]
299pub struct SinkStatisticsMetrics {
300 pub(crate) messages_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
302 pub(crate) messages_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
303 pub(crate) bytes_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
304 pub(crate) bytes_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
305}
306
307impl SinkStatisticsMetrics {
308 pub(crate) fn new(
309 defs: &SinkStatisticsMetricDefs,
310 id: GlobalId,
311 worker_id: usize,
312 ) -> SinkStatisticsMetrics {
313 SinkStatisticsMetrics {
314 messages_staged: defs
315 .messages_staged
316 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
317 messages_committed: defs
318 .messages_committed
319 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
320 bytes_staged: defs
321 .bytes_staged
322 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
323 bytes_committed: defs
324 .bytes_committed
325 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
326 }
327 }
328}
329
330#[derive(Debug, Clone)]
332pub struct SourceStatisticsMetadata {
333 resume_upper: Antichain<Timestamp>,
335 created_at: Instant,
337}
338
339impl SourceStatisticsMetadata {
340 pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
342 Self {
343 resume_upper,
344 created_at: Instant::now(),
345 }
346 }
347}
348
349#[derive(Debug)]
350struct StatsInner<Stats, Metrics> {
351 stats: Stats,
352 prom: Metrics,
353}
354
355#[derive(Debug)]
367pub struct StorageStatistics<Stats, Metrics, Meta> {
368 stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
372 meta: Meta,
374}
375
376impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
377 fn clone(&self) -> Self {
378 Self {
379 stats: Rc::clone(&self.stats),
380 meta: self.meta.clone(),
381 }
382 }
383}
384
385#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
387pub struct SourceStatisticsRecord {
388 id: GlobalId,
389 worker_id: usize,
390 messages_received: u64,
392 bytes_received: u64,
393 updates_staged: u64,
394 updates_committed: u64,
395
396 records_indexed: Option<u64>,
399 bytes_indexed: Option<u64>,
400
401 rehydration_latency_ms: Option<Option<i64>>,
403
404 snapshot_records_known: Option<Option<u64>>,
407 snapshot_records_staged: Option<Option<u64>>,
408 snapshot_committed: Option<bool>,
409 offset_known: Option<Option<u64>>,
410 offset_committed: Option<Option<u64>>,
411
412 envelope_state_tombstones: u64,
414}
415
416impl SourceStatisticsRecord {
417 fn reset_gauges(&mut self) {
418 self.rehydration_latency_ms = None;
421 self.snapshot_committed = None;
422
423 self.bytes_indexed = Some(0);
425 self.records_indexed = Some(0);
426
427 self.snapshot_records_known = Some(None);
429 self.snapshot_records_staged = Some(None);
430 self.offset_known = Some(None);
431 self.offset_committed = Some(None);
432
433 self.envelope_state_tombstones = 0;
434 }
435
436 fn reset_counters(&mut self) {
438 self.messages_received = 0;
439 self.bytes_received = 0;
440 self.updates_staged = 0;
441 self.updates_committed = 0;
442 }
443
444 fn as_update(&self) -> SourceStatisticsUpdate {
447 let SourceStatisticsRecord {
448 id,
449 worker_id: _,
450 messages_received,
451 bytes_received,
452 updates_staged,
453 updates_committed,
454 records_indexed,
455 bytes_indexed,
456 rehydration_latency_ms,
457 snapshot_records_known,
458 snapshot_records_staged,
459 snapshot_committed,
460 offset_known,
461 offset_committed,
462 envelope_state_tombstones: _,
463 } = self.clone();
464
465 SourceStatisticsUpdate {
466 id,
467 messages_received: messages_received.into(),
468 bytes_received: bytes_received.into(),
469 updates_staged: updates_staged.into(),
470 updates_committed: updates_committed.into(),
471 records_indexed: Gauge::gauge(records_indexed.unwrap()),
472 bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
473 rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
474 snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
475 snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
476 snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
477 offset_known: Gauge::gauge(offset_known.unwrap()),
478 offset_committed: Gauge::gauge(offset_committed.unwrap()),
479 }
480 }
481}
482
483#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
485pub struct SinkStatisticsRecord {
486 id: GlobalId,
487 worker_id: usize,
488 messages_staged: u64,
490 messages_committed: u64,
491 bytes_staged: u64,
492 bytes_committed: u64,
493}
494
495impl SinkStatisticsRecord {
496 fn reset_gauges(&self) {}
497
498 fn reset_counters(&mut self) {
500 self.messages_staged = 0;
501 self.messages_committed = 0;
502 self.bytes_staged = 0;
503 self.bytes_committed = 0;
504 }
505
506 fn as_update(&self) -> SinkStatisticsUpdate {
509 let SinkStatisticsRecord {
510 id,
511 worker_id: _,
512 messages_staged,
513 messages_committed,
514 bytes_staged,
515 bytes_committed,
516 } = self.clone();
517
518 SinkStatisticsUpdate {
519 id,
520 messages_staged: messages_staged.into(),
521 messages_committed: messages_committed.into(),
522 bytes_staged: bytes_staged.into(),
523 bytes_committed: bytes_committed.into(),
524 }
525 }
526}
527
528pub type SourceStatistics =
530 StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
531
532pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
534
535impl SourceStatistics {
536 pub(crate) fn new(
537 id: GlobalId,
538 worker_id: usize,
539 metrics: &SourceStatisticsMetricDefs,
540 parent_source_id: GlobalId,
541 shard_id: &mz_persist_client::ShardId,
542 envelope: SourceEnvelope,
543 resume_upper: Antichain<Timestamp>,
544 ) -> Self {
545 Self {
546 stats: Rc::new(RefCell::new(StatsInner {
547 stats: SourceStatisticsRecord {
548 id,
549 worker_id,
550 messages_received: 0,
551 updates_staged: 0,
552 updates_committed: 0,
553 bytes_received: 0,
554 records_indexed: Some(0),
555 bytes_indexed: Some(0),
556 rehydration_latency_ms: None,
557 snapshot_records_staged: Some(None),
558 snapshot_records_known: Some(None),
559 snapshot_committed: None,
560 offset_known: Some(None),
561 offset_committed: Some(None),
562 envelope_state_tombstones: 0,
563 },
564 prom: SourceStatisticsMetrics::new(
565 metrics,
566 id,
567 worker_id,
568 parent_source_id,
569 shard_id,
570 envelope,
571 ),
572 })),
573 meta: SourceStatisticsMetadata::new(resume_upper),
574 }
575 }
576
577 pub fn reset_gauges(&self) {
580 let mut cur = self.stats.borrow_mut();
581 cur.stats.reset_gauges();
582 }
583
584 pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
588 let mut cur = self.stats.borrow_mut();
589
590 match &cur.stats {
591 SourceStatisticsRecord {
592 records_indexed: Some(_),
593 bytes_indexed: Some(_),
594 rehydration_latency_ms: Some(_),
595 snapshot_records_known: Some(_),
596 snapshot_records_staged: Some(_),
597 snapshot_committed: Some(_),
598 offset_known: Some(_),
599 offset_committed: Some(_),
600 ..
601 } => {
602 let ret = Some(cur.stats.clone());
603 cur.stats.reset_counters();
604 ret
605 }
606 _ => None,
607 }
608 }
609
610 pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
617 self.update_snapshot_committed(upper);
618 }
619
620 pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
622 let value = *upper != Antichain::from_elem(Timestamp::MIN);
623 let mut cur = self.stats.borrow_mut();
624 cur.stats.snapshot_committed = Some(value);
625 cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
626 }
627
628 pub fn inc_messages_received_by(&self, value: u64) {
630 let mut cur = self.stats.borrow_mut();
631 cur.stats.messages_received = cur.stats.messages_received + value;
632 cur.prom.messages_received.inc_by(value);
633 }
634
635 pub fn inc_updates_staged_by(&self, value: u64) {
637 let mut cur = self.stats.borrow_mut();
638 cur.stats.updates_staged = cur.stats.updates_staged + value;
639 cur.prom.updates_staged.inc_by(value);
640 }
641
642 pub fn inc_updates_committed_by(&self, value: u64) {
644 let mut cur = self.stats.borrow_mut();
645 cur.stats.updates_committed = cur.stats.updates_committed + value;
646 cur.prom.updates_committed.inc_by(value);
647 }
648
649 pub fn inc_bytes_received_by(&self, value: u64) {
651 let mut cur = self.stats.borrow_mut();
652 cur.stats.bytes_received = cur.stats.bytes_received + value;
653 cur.prom.bytes_received.inc_by(value);
654 }
655
656 pub fn update_bytes_indexed_by(&self, value: i64) {
659 let mut cur = self.stats.borrow_mut();
660 if let Some(updated) = cur
661 .stats
662 .bytes_indexed
663 .unwrap_or(0)
664 .checked_add_signed(value)
665 {
666 cur.stats.bytes_indexed = Some(updated);
667 cur.prom.bytes_indexed.set(updated);
668 } else {
669 let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
670 tracing::warn!(
671 "Unexpected u64 overflow while updating bytes_indexed value {} with {}",
672 bytes_indexed,
673 value
674 );
675 cur.stats.bytes_indexed = Some(0);
676 cur.prom.bytes_indexed.set(0);
677 }
678 }
679
680 pub fn set_bytes_indexed(&self, value: i64) {
682 let mut cur = self.stats.borrow_mut();
683 let value = if value < 0 {
684 tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
685 0
686 } else {
687 value.unsigned_abs()
688 };
689 cur.stats.bytes_indexed = Some(value);
690 cur.prom.bytes_indexed.set(value);
691 }
692
693 pub fn update_records_indexed_by(&self, value: i64) {
696 let mut cur = self.stats.borrow_mut();
697 if let Some(updated) = cur
698 .stats
699 .records_indexed
700 .unwrap_or(0)
701 .checked_add_signed(value)
702 {
703 cur.stats.records_indexed = Some(updated);
704 cur.prom.records_indexed.set(updated);
705 } else {
706 let records_indexed = cur.stats.records_indexed.unwrap_or(0);
707 tracing::warn!(
708 "Unexpected u64 overflow while updating records_indexed value {} with {}",
709 records_indexed,
710 value
711 );
712 cur.stats.records_indexed = Some(0);
713 cur.prom.records_indexed.set(0);
714 }
715 }
716
717 pub fn set_records_indexed(&self, value: i64) {
719 let mut cur = self.stats.borrow_mut();
720 let value = if value < 0 {
721 tracing::warn!("Unexpected negative value for records_indexed {}", value);
722 0
723 } else {
724 value.unsigned_abs()
725 };
726 cur.stats.records_indexed = Some(value);
727 cur.prom.records_indexed.set(value);
728 }
729
730 pub fn initialize_rehydration_latency_ms(&self) {
732 let mut cur = self.stats.borrow_mut();
733 cur.stats.rehydration_latency_ms = Some(None);
734 }
735
736 pub fn update_envelope_state_tombstones_by(&self, value: i64) {
740 let mut cur = self.stats.borrow_mut();
741 if let Some(updated) = cur
742 .stats
743 .envelope_state_tombstones
744 .checked_add_signed(value)
745 {
746 cur.stats.envelope_state_tombstones = updated;
747 cur.prom.envelope_state_tombstones.set(updated);
748 } else {
749 let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
750 tracing::warn!(
751 "Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
752 envelope_state_tombstones,
753 value
754 );
755 cur.stats.envelope_state_tombstones = 0;
756 cur.prom.envelope_state_tombstones.set(0);
757 }
758 }
759
760 pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
762 let mut cur = self.stats.borrow_mut();
763
764 if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
765 return; }
767 if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
768 return; }
770
771 let elapsed = self.meta.created_at.elapsed();
772 let value = elapsed
773 .as_millis()
774 .try_into()
775 .expect("Rehydration took more than ~584 million years!");
776 cur.stats.rehydration_latency_ms = Some(Some(value));
777 cur.prom.rehydration_latency_ms.set(value);
778 }
779
780 pub fn set_offset_known(&self, value: u64) {
782 let mut cur = self.stats.borrow_mut();
783 cur.stats.offset_known = Some(Some(value));
784 cur.prom.offset_known.set(value);
785 }
786
787 pub fn set_offset_committed(&self, value: u64) {
789 let mut cur = self.stats.borrow_mut();
790 cur.stats.offset_committed = Some(Some(value));
791 cur.prom.offset_committed.set(value);
792 }
793
794 pub fn set_snapshot_records_known(&self, value: u64) {
796 let mut cur = self.stats.borrow_mut();
797 cur.stats.snapshot_records_known = Some(Some(value));
798 cur.prom.snapshot_records_known.set(value);
799 }
800
801 pub fn set_snapshot_records_staged(&self, value: u64) {
803 let mut cur = self.stats.borrow_mut();
804 cur.stats.snapshot_records_staged = Some(Some(value));
805 cur.prom.snapshot_records_staged.set(value);
806 }
807}
808
809impl SinkStatistics {
810 pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
811 Self {
812 stats: Rc::new(RefCell::new(StatsInner {
813 stats: SinkStatisticsRecord {
814 id,
815 worker_id,
816 messages_staged: 0,
817 messages_committed: 0,
818 bytes_staged: 0,
819 bytes_committed: 0,
820 },
821 prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
822 })),
823 meta: (),
824 }
825 }
826
827 pub fn reset_gauges(&self) {
830 let cur = self.stats.borrow_mut();
831 cur.stats.reset_gauges();
832 }
833
834 pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
838 let mut cur = self.stats.borrow_mut();
839
840 match &cur.stats {
841 SinkStatisticsRecord { .. } => {
842 let ret = Some(cur.stats.clone());
843 cur.stats.reset_counters();
844 ret
845 }
846 }
847 }
848
849 pub fn inc_messages_staged_by(&self, value: u64) {
851 let mut cur = self.stats.borrow_mut();
852 cur.stats.messages_staged = cur.stats.messages_staged + value;
853 cur.prom.messages_staged.inc_by(value);
854 }
855
856 pub fn inc_bytes_staged_by(&self, value: u64) {
858 let mut cur = self.stats.borrow_mut();
859 cur.stats.bytes_staged = cur.stats.bytes_staged + value;
860 cur.prom.bytes_staged.inc_by(value);
861 }
862
863 pub fn inc_messages_committed_by(&self, value: u64) {
865 let mut cur = self.stats.borrow_mut();
866 cur.stats.messages_committed = cur.stats.messages_committed + value;
867 cur.prom.messages_committed.inc_by(value);
868 }
869
870 pub fn inc_bytes_committed_by(&self, value: u64) {
872 let mut cur = self.stats.borrow_mut();
873 cur.stats.bytes_committed = cur.stats.bytes_committed + value;
874 cur.prom.bytes_committed.inc_by(value);
875 }
876}
877
878pub struct AggregatedStatistics {
903 worker_id: usize,
904 worker_count: usize,
905 local_source_statistics: BTreeMap<GlobalId, (usize, SourceStatistics)>,
906 local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
907
908 global_source_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SourceStatisticsUpdate>>)>,
909 global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
910}
911
912impl AggregatedStatistics {
913 pub fn new(worker_id: usize, worker_count: usize) -> Self {
915 AggregatedStatistics {
916 worker_id,
917 worker_count,
918 local_source_statistics: Default::default(),
919 local_sink_statistics: Default::default(),
920 global_source_statistics: Default::default(),
921 global_sink_statistics: Default::default(),
922 }
923 }
924
925 pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
927 self.local_source_statistics.get(id).map(|(_, s)| s)
928 }
929
930 pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
932 self.local_sink_statistics.get(id).map(|(_, s)| s)
933 }
934
935 pub fn deinitialize(&mut self, id: GlobalId) {
938 self.local_source_statistics.remove(&id);
939 self.local_sink_statistics.remove(&id);
940 self.global_source_statistics.remove(&id);
941 self.global_sink_statistics.remove(&id);
942 }
943
944 pub fn advance_global_epoch(&mut self, id: GlobalId) {
949 if let Some((epoch, stats)) = self.global_source_statistics.get_mut(&id) {
950 *epoch += 1;
951 for worker_stats in stats {
952 if let Some(update) = worker_stats {
953 update.reset_gauges();
954 }
955 }
956 }
957 if let Some((epoch, stats)) = self.global_sink_statistics.get_mut(&id) {
958 *epoch += 1;
959 for worker_stats in stats {
960 if let Some(update) = worker_stats {
961 update.reset_gauges();
962 }
963 }
964 }
965 }
966
967 pub fn initialize_source<F: FnOnce() -> SourceStatistics>(
969 &mut self,
970 id: GlobalId,
971 resume_upper: Antichain<Timestamp>,
972 stats: F,
973 ) {
974 self.local_source_statistics
975 .entry(id)
976 .and_modify(|(epoch, stats)| {
977 *epoch += 1;
978 stats.reset_gauges();
979 stats.meta = SourceStatisticsMetadata::new(resume_upper);
980 })
981 .or_insert_with(|| (0, stats()));
982
983 if self.worker_id == 0 {
984 self.global_source_statistics
985 .entry(id)
986 .or_insert_with(|| (0, vec![None; self.worker_count]));
987 }
988 }
989
990 pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
992 self.local_sink_statistics
993 .entry(id)
994 .and_modify(|(epoch, stats)| {
995 *epoch += 1;
996 stats.reset_gauges();
997 })
998 .or_insert_with(|| (0, stats()));
999 if self.worker_id == 0 {
1000 self.global_sink_statistics
1001 .entry(id)
1002 .or_insert_with(|| (0, vec![None; self.worker_count]));
1003 }
1004 }
1005
1006 pub fn ingest(
1008 &mut self,
1009 source_statistics: Vec<(usize, SourceStatisticsRecord)>,
1010 sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
1011 ) {
1012 if self.worker_id != 0 {
1014 return;
1015 }
1016
1017 for (epoch, stat) in source_statistics {
1018 if let Some((global_epoch, stats)) = self.global_source_statistics.get_mut(&stat.id) {
1019 let epoch_match = epoch >= *global_epoch;
1022 let mut update = stat.as_update();
1023 match (&mut stats[stat.worker_id], epoch_match) {
1024 (None, true) => stats[stat.worker_id] = Some(update),
1025 (None, false) => {
1026 update.reset_gauges();
1027 stats[stat.worker_id] = Some(update);
1028 }
1029 (Some(occupied), true) => occupied.incorporate(update),
1030 (Some(occupied), false) => occupied.incorporate_counters(update),
1031 }
1032 }
1033 }
1034
1035 for (epoch, stat) in sink_statistics {
1036 if let Some((global_epoch, stats)) = self.global_sink_statistics.get_mut(&stat.id) {
1037 let epoch_match = epoch >= *global_epoch;
1040 let update = stat.as_update();
1041 match (&mut stats[stat.worker_id], epoch_match) {
1042 (None, true) => stats[stat.worker_id] = Some(update),
1043 (None, false) => {
1044 update.reset_gauges();
1045 stats[stat.worker_id] = Some(update);
1046 }
1047 (Some(occupied), true) => occupied.incorporate(update),
1048 (Some(occupied), false) => occupied.incorporate_counters(update),
1049 }
1050 }
1051 }
1052 }
1053
1054 fn _emit_local(
1055 &mut self,
1056 ) -> (
1057 Vec<(usize, SourceStatisticsRecord)>,
1058 Vec<(usize, SinkStatisticsRecord)>,
1059 ) {
1060 let sources = self
1061 .local_source_statistics
1062 .values_mut()
1063 .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1064 .collect();
1065
1066 let sinks = self
1067 .local_sink_statistics
1068 .values_mut()
1069 .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1070 .collect();
1071
1072 (sources, sinks)
1073 }
1074
1075 pub fn emit_local(
1077 &mut self,
1078 ) -> (
1079 Vec<(usize, SourceStatisticsRecord)>,
1080 Vec<(usize, SinkStatisticsRecord)>,
1081 ) {
1082 if self.worker_id == 0 {
1085 return (Vec::new(), Vec::new());
1086 }
1087
1088 self._emit_local()
1089 }
1090
1091 pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
1094 if !self.worker_id == 0 {
1095 return (Vec::new(), Vec::new());
1096 }
1097
1098 let (sources, sinks) = self._emit_local();
1099 self.ingest(sources, sinks);
1100
1101 let sources = self
1102 .global_source_statistics
1103 .iter_mut()
1104 .filter_map(|(_, (_, s))| {
1105 if s.iter().all(|s| s.is_some()) {
1106 let ret = Some(SourceStatisticsUpdate::summarize(|| {
1107 s.iter().filter_map(Option::as_ref)
1108 }));
1109
1110 s.iter_mut().for_each(|s| {
1112 if let Some(s) = s {
1113 s.reset_counters();
1114 }
1115 });
1116 ret
1117 } else {
1118 None
1119 }
1120 })
1121 .collect();
1122
1123 let sinks = self
1124 .global_sink_statistics
1125 .iter_mut()
1126 .filter_map(|(_, (_, s))| {
1127 if s.iter().all(|s| s.is_some()) {
1128 let ret = Some(SinkStatisticsUpdate::summarize(|| {
1129 s.iter().filter_map(Option::as_ref)
1130 }));
1131
1132 s.iter_mut().for_each(|s| {
1134 if let Some(s) = s {
1135 s.reset_counters();
1136 }
1137 });
1138 ret
1139 } else {
1140 None
1141 }
1142 })
1143 .collect();
1144
1145 (sources, sinks)
1146 }
1147}