1use std::cell::RefCell;
26use std::collections::BTreeMap;
27use std::rc::Rc;
28use std::time::Instant;
29
30use mz_ore::metric;
31use mz_ore::metrics::{
32 DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricsRegistry,
33 UIntGaugeVec,
34};
35use mz_repr::{GlobalId, Timestamp};
36use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
37use mz_storage_types::sources::SourceEnvelope;
38use prometheus::core::{AtomicI64, AtomicU64};
39use serde::{Deserialize, Serialize};
40use timely::PartialOrder;
41use timely::progress::frontier::Antichain;
42
43#[derive(Clone, Debug)]
47pub(crate) struct SourceStatisticsMetricDefs {
48 pub(crate) messages_received: IntCounterVec,
50 pub(crate) updates_staged: IntCounterVec,
51 pub(crate) updates_committed: IntCounterVec,
52 pub(crate) bytes_received: IntCounterVec,
53
54 pub(crate) snapshot_committed: UIntGaugeVec,
56 pub(crate) bytes_indexed: UIntGaugeVec,
57 pub(crate) records_indexed: UIntGaugeVec,
58 pub(crate) rehydration_latency_ms: IntGaugeVec,
59
60 pub(crate) offset_known: UIntGaugeVec,
61 pub(crate) offset_committed: UIntGaugeVec,
62 pub(crate) snapshot_records_known: UIntGaugeVec,
63 pub(crate) snapshot_records_staged: UIntGaugeVec,
64
65 pub(crate) envelope_state_tombstones: UIntGaugeVec,
67}
68
69impl SourceStatisticsMetricDefs {
70 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71 Self {
72 snapshot_committed: registry.register(metric!(
73 name: "mz_source_snapshot_committed",
74 help: "Whether or not the worker has committed the initial snapshot for a source.",
75 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
76 )),
77 messages_received: registry.register(metric!(
78 name: "mz_source_messages_received",
79 help: "The number of raw messages the worker has received from upstream.",
80 var_labels: ["source_id", "worker_id", "parent_source_id"],
81 )),
82 updates_staged: registry.register(metric!(
83 name: "mz_source_updates_staged",
84 help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
85 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
86 )),
87 updates_committed: registry.register(metric!(
88 name: "mz_source_updates_committed",
89 help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
90 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
91 )),
92 bytes_received: registry.register(metric!(
93 name: "mz_source_bytes_received",
94 help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
95 var_labels: ["source_id", "worker_id", "parent_source_id"],
96 )),
97 bytes_indexed: registry.register(metric!(
98 name: "mz_source_bytes_indexed",
99 help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
100 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
101 )),
102 records_indexed: registry.register(metric!(
103 name: "mz_source_records_indexed",
104 help: "The number of records in the source envelope state. This will be specific to the envelope in use",
105 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
106 )),
107 envelope_state_tombstones: registry.register(metric!(
108 name: "mz_source_envelope_state_tombstones",
109 help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
110 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
111 )),
112 rehydration_latency_ms: registry.register(metric!(
113 name: "mz_source_rehydration_latency_ms",
114 help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
115 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
116 )),
117 offset_known: registry.register(metric!(
118 name: "mz_source_offset_known",
119 help: "The total number of _values_ (source-defined unit) present in upstream.",
120 var_labels: ["source_id", "worker_id", "shard_id"],
121 )),
122 offset_committed: registry.register(metric!(
123 name: "mz_source_offset_committed",
124 help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
125 var_labels: ["source_id", "worker_id", "shard_id"],
126 )),
127 snapshot_records_known: registry.register(metric!(
128 name: "mz_source_snapshot_records_known",
129 help: "The total number of records in the source's snapshot",
130 var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
131 )),
132 snapshot_records_staged: registry.register(metric!(
133 name: "mz_source_snapshot_records_staged",
134 help: "The total number of records read from the source's snapshot",
135 var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
136 )),
137 }
138 }
139}
140
141#[derive(Debug)]
143pub struct SourceStatisticsMetrics {
144 pub(crate) messages_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
146 pub(crate) updates_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
147 pub(crate) updates_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
148 pub(crate) bytes_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
149
150 pub(crate) snapshot_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
152 pub(crate) bytes_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
153 pub(crate) records_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
154 pub(crate) rehydration_latency_ms: DeleteOnDropGauge<AtomicI64, Vec<String>>,
155
156 pub(crate) offset_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
157 pub(crate) offset_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
158 pub(crate) snapshot_records_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
159 pub(crate) snapshot_records_staged: DeleteOnDropGauge<AtomicU64, Vec<String>>,
160
161 pub(crate) envelope_state_tombstones: DeleteOnDropGauge<AtomicU64, Vec<String>>,
163}
164
165impl SourceStatisticsMetrics {
166 pub(crate) fn new(
167 defs: &SourceStatisticsMetricDefs,
168 id: GlobalId,
169 worker_id: usize,
170 parent_source_id: GlobalId,
171 shard_id: &mz_persist_client::ShardId,
172 envelope: SourceEnvelope,
173 ) -> SourceStatisticsMetrics {
174 let shard = shard_id.to_string();
175 let envelope = match envelope {
176 SourceEnvelope::None(_) => "none",
177 SourceEnvelope::Upsert(_) => "upsert",
178 SourceEnvelope::CdcV2 => "cdcv2",
179 };
180
181 SourceStatisticsMetrics {
182 snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
183 id.to_string(),
184 worker_id.to_string(),
185 parent_source_id.to_string(),
186 shard.clone(),
187 ]),
188 messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
189 id.to_string(),
190 worker_id.to_string(),
191 parent_source_id.to_string(),
192 ]),
193 updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
194 id.to_string(),
195 worker_id.to_string(),
196 parent_source_id.to_string(),
197 shard.clone(),
198 ]),
199 updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
200 id.to_string(),
201 worker_id.to_string(),
202 parent_source_id.to_string(),
203 shard.clone(),
204 ]),
205 bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
206 id.to_string(),
207 worker_id.to_string(),
208 parent_source_id.to_string(),
209 ]),
210 bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
211 id.to_string(),
212 worker_id.to_string(),
213 parent_source_id.to_string(),
214 shard.clone(),
215 ]),
216 records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
217 id.to_string(),
218 worker_id.to_string(),
219 parent_source_id.to_string(),
220 shard.clone(),
221 ]),
222 envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
223 vec![
224 id.to_string(),
225 worker_id.to_string(),
226 parent_source_id.to_string(),
227 shard.clone(),
228 ],
229 ),
230 rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
231 id.to_string(),
232 worker_id.to_string(),
233 parent_source_id.to_string(),
234 shard.clone(),
235 envelope.to_string(),
236 ]),
237 offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
238 id.to_string(),
239 worker_id.to_string(),
240 shard.clone(),
241 ]),
242 offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
243 id.to_string(),
244 worker_id.to_string(),
245 shard.clone(),
246 ]),
247 snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
248 id.to_string(),
249 worker_id.to_string(),
250 shard.clone(),
251 parent_source_id.to_string(),
252 ]),
253 snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
254 id.to_string(),
255 worker_id.to_string(),
256 shard.clone(),
257 parent_source_id.to_string(),
258 ]),
259 }
260 }
261}
262
263#[derive(Clone, Debug)]
264pub(crate) struct SinkStatisticsMetricDefs {
265 pub(crate) messages_staged: IntCounterVec,
267 pub(crate) messages_committed: IntCounterVec,
268 pub(crate) bytes_staged: IntCounterVec,
269 pub(crate) bytes_committed: IntCounterVec,
270}
271
272impl SinkStatisticsMetricDefs {
273 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
274 Self {
275 messages_staged: registry.register(metric!(
276 name: "mz_sink_messages_staged",
277 help: "The number of messages staged but possibly not committed to the sink.",
278 var_labels: ["sink_id", "worker_id"],
279 )),
280 messages_committed: registry.register(metric!(
281 name: "mz_sink_messages_committed",
282 help: "The number of messages committed to the sink.",
283 var_labels: ["sink_id", "worker_id"],
284 )),
285 bytes_staged: registry.register(metric!(
286 name: "mz_sink_bytes_staged",
287 help: "The number of bytes staged but possibly not committed to the sink.",
288 var_labels: ["sink_id", "worker_id"],
289 )),
290 bytes_committed: registry.register(metric!(
291 name: "mz_sink_bytes_committed",
292 help: "The number of bytes committed to the sink.",
293 var_labels: ["sink_id", "worker_id"],
294 )),
295 }
296 }
297}
298
299#[derive(Debug)]
301pub struct SinkStatisticsMetrics {
302 pub(crate) messages_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
304 pub(crate) messages_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
305 pub(crate) bytes_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
306 pub(crate) bytes_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
307}
308
309impl SinkStatisticsMetrics {
310 pub(crate) fn new(
311 defs: &SinkStatisticsMetricDefs,
312 id: GlobalId,
313 worker_id: usize,
314 ) -> SinkStatisticsMetrics {
315 SinkStatisticsMetrics {
316 messages_staged: defs
317 .messages_staged
318 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
319 messages_committed: defs
320 .messages_committed
321 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
322 bytes_staged: defs
323 .bytes_staged
324 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
325 bytes_committed: defs
326 .bytes_committed
327 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
328 }
329 }
330}
331
332#[derive(Debug, Clone)]
334pub struct SourceStatisticsMetadata {
335 resume_upper: Antichain<Timestamp>,
337 created_at: Instant,
339}
340
341impl SourceStatisticsMetadata {
342 pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
344 Self {
345 resume_upper,
346 created_at: Instant::now(),
347 }
348 }
349}
350
351#[derive(Debug)]
352struct StatsInner<Stats, Metrics> {
353 stats: Stats,
354 prom: Metrics,
355}
356
357#[derive(Debug)]
369pub struct StorageStatistics<Stats, Metrics, Meta> {
370 stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
374 meta: Meta,
376}
377
378impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
379 fn clone(&self) -> Self {
380 Self {
381 stats: Rc::clone(&self.stats),
382 meta: self.meta.clone(),
383 }
384 }
385}
386
387#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
389pub struct SourceStatisticsRecord {
390 id: GlobalId,
391 worker_id: usize,
392 messages_received: u64,
394 bytes_received: u64,
395 updates_staged: u64,
396 updates_committed: u64,
397
398 records_indexed: Option<u64>,
401 bytes_indexed: Option<u64>,
402
403 rehydration_latency_ms: Option<Option<i64>>,
405
406 snapshot_records_known: Option<Option<u64>>,
409 snapshot_records_staged: Option<Option<u64>>,
410 snapshot_committed: Option<bool>,
411 offset_known: Option<Option<u64>>,
412 offset_committed: Option<Option<u64>>,
413
414 envelope_state_tombstones: u64,
416}
417
418impl SourceStatisticsRecord {
419 fn reset_gauges(&mut self) {
420 self.rehydration_latency_ms = None;
423 self.snapshot_committed = None;
424
425 self.bytes_indexed = Some(0);
427 self.records_indexed = Some(0);
428
429 self.snapshot_records_known = Some(None);
431 self.snapshot_records_staged = Some(None);
432 self.offset_known = Some(None);
433 self.offset_committed = Some(None);
434
435 self.envelope_state_tombstones = 0;
436 }
437
438 fn reset_counters(&mut self) {
440 self.messages_received = 0;
441 self.bytes_received = 0;
442 self.updates_staged = 0;
443 self.updates_committed = 0;
444 }
445
446 fn as_update(&self) -> SourceStatisticsUpdate {
449 let SourceStatisticsRecord {
450 id,
451 worker_id: _,
452 messages_received,
453 bytes_received,
454 updates_staged,
455 updates_committed,
456 records_indexed,
457 bytes_indexed,
458 rehydration_latency_ms,
459 snapshot_records_known,
460 snapshot_records_staged,
461 snapshot_committed,
462 offset_known,
463 offset_committed,
464 envelope_state_tombstones: _,
465 } = self.clone();
466
467 SourceStatisticsUpdate {
468 id,
469 messages_received: messages_received.into(),
470 bytes_received: bytes_received.into(),
471 updates_staged: updates_staged.into(),
472 updates_committed: updates_committed.into(),
473 records_indexed: Gauge::gauge(records_indexed.unwrap()),
474 bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
475 rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
476 snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
477 snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
478 snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
479 offset_known: Gauge::gauge(offset_known.unwrap()),
480 offset_committed: Gauge::gauge(offset_committed.unwrap()),
481 }
482 }
483}
484
485#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
487pub struct SinkStatisticsRecord {
488 id: GlobalId,
489 worker_id: usize,
490 messages_staged: u64,
492 messages_committed: u64,
493 bytes_staged: u64,
494 bytes_committed: u64,
495}
496
497impl SinkStatisticsRecord {
498 fn reset_gauges(&self) {}
499
500 fn reset_counters(&mut self) {
502 self.messages_staged = 0;
503 self.messages_committed = 0;
504 self.bytes_staged = 0;
505 self.bytes_committed = 0;
506 }
507
508 fn as_update(&self) -> SinkStatisticsUpdate {
511 let SinkStatisticsRecord {
512 id,
513 worker_id: _,
514 messages_staged,
515 messages_committed,
516 bytes_staged,
517 bytes_committed,
518 } = self.clone();
519
520 SinkStatisticsUpdate {
521 id,
522 messages_staged: messages_staged.into(),
523 messages_committed: messages_committed.into(),
524 bytes_staged: bytes_staged.into(),
525 bytes_committed: bytes_committed.into(),
526 }
527 }
528}
529
530pub type SourceStatistics =
532 StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
533
534pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
536
537impl SourceStatistics {
538 pub(crate) fn new(
539 id: GlobalId,
540 worker_id: usize,
541 metrics: &SourceStatisticsMetricDefs,
542 parent_source_id: GlobalId,
543 shard_id: &mz_persist_client::ShardId,
544 envelope: SourceEnvelope,
545 resume_upper: Antichain<Timestamp>,
546 ) -> Self {
547 Self {
548 stats: Rc::new(RefCell::new(StatsInner {
549 stats: SourceStatisticsRecord {
550 id,
551 worker_id,
552 messages_received: 0,
553 updates_staged: 0,
554 updates_committed: 0,
555 bytes_received: 0,
556 records_indexed: Some(0),
557 bytes_indexed: Some(0),
558 rehydration_latency_ms: None,
559 snapshot_records_staged: Some(None),
560 snapshot_records_known: Some(None),
561 snapshot_committed: None,
562 offset_known: Some(None),
563 offset_committed: Some(None),
564 envelope_state_tombstones: 0,
565 },
566 prom: SourceStatisticsMetrics::new(
567 metrics,
568 id,
569 worker_id,
570 parent_source_id,
571 shard_id,
572 envelope,
573 ),
574 })),
575 meta: SourceStatisticsMetadata::new(resume_upper),
576 }
577 }
578
579 pub fn reset_gauges(&self) {
582 let mut cur = self.stats.borrow_mut();
583 cur.stats.reset_gauges();
584 }
585
586 pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
590 let mut cur = self.stats.borrow_mut();
591
592 match &cur.stats {
593 SourceStatisticsRecord {
594 records_indexed: Some(_),
595 bytes_indexed: Some(_),
596 rehydration_latency_ms: Some(_),
597 snapshot_records_known: Some(_),
598 snapshot_records_staged: Some(_),
599 snapshot_committed: Some(_),
600 offset_known: Some(_),
601 offset_committed: Some(_),
602 ..
603 } => {
604 let ret = Some(cur.stats.clone());
605 cur.stats.reset_counters();
606 ret
607 }
608 _ => None,
609 }
610 }
611
612 pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
619 self.update_snapshot_committed(upper);
620 }
621
622 pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
624 let value = *upper != Antichain::from_elem(Timestamp::MIN);
625 let mut cur = self.stats.borrow_mut();
626 cur.stats.snapshot_committed = Some(value);
627 cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
628 }
629
630 pub fn inc_messages_received_by(&self, value: u64) {
632 let mut cur = self.stats.borrow_mut();
633 cur.stats.messages_received = cur.stats.messages_received + value;
634 cur.prom.messages_received.inc_by(value);
635 }
636
637 pub fn inc_updates_staged_by(&self, value: u64) {
639 let mut cur = self.stats.borrow_mut();
640 cur.stats.updates_staged = cur.stats.updates_staged + value;
641 cur.prom.updates_staged.inc_by(value);
642 }
643
644 pub fn inc_updates_committed_by(&self, value: u64) {
646 let mut cur = self.stats.borrow_mut();
647 cur.stats.updates_committed = cur.stats.updates_committed + value;
648 cur.prom.updates_committed.inc_by(value);
649 }
650
651 pub fn inc_bytes_received_by(&self, value: u64) {
653 let mut cur = self.stats.borrow_mut();
654 cur.stats.bytes_received = cur.stats.bytes_received + value;
655 cur.prom.bytes_received.inc_by(value);
656 }
657
658 pub fn update_bytes_indexed_by(&self, value: i64) {
661 let mut cur = self.stats.borrow_mut();
662 if let Some(updated) = cur
663 .stats
664 .bytes_indexed
665 .unwrap_or(0)
666 .checked_add_signed(value)
667 {
668 cur.stats.bytes_indexed = Some(updated);
669 cur.prom.bytes_indexed.set(updated);
670 } else {
671 let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
672 tracing::warn!(
673 "Unexpected u64 overflow while updating bytes_indexed value {} with {}",
674 bytes_indexed,
675 value
676 );
677 cur.stats.bytes_indexed = Some(0);
678 cur.prom.bytes_indexed.set(0);
679 }
680 }
681
682 pub fn set_bytes_indexed(&self, value: i64) {
684 let mut cur = self.stats.borrow_mut();
685 let value = if value < 0 {
686 tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
687 0
688 } else {
689 value.unsigned_abs()
690 };
691 cur.stats.bytes_indexed = Some(value);
692 cur.prom.bytes_indexed.set(value);
693 }
694
695 pub fn update_records_indexed_by(&self, value: i64) {
698 let mut cur = self.stats.borrow_mut();
699 if let Some(updated) = cur
700 .stats
701 .records_indexed
702 .unwrap_or(0)
703 .checked_add_signed(value)
704 {
705 cur.stats.records_indexed = Some(updated);
706 cur.prom.records_indexed.set(updated);
707 } else {
708 let records_indexed = cur.stats.records_indexed.unwrap_or(0);
709 tracing::warn!(
710 "Unexpected u64 overflow while updating records_indexed value {} with {}",
711 records_indexed,
712 value
713 );
714 cur.stats.records_indexed = Some(0);
715 cur.prom.records_indexed.set(0);
716 }
717 }
718
719 pub fn set_records_indexed(&self, value: i64) {
721 let mut cur = self.stats.borrow_mut();
722 let value = if value < 0 {
723 tracing::warn!("Unexpected negative value for records_indexed {}", value);
724 0
725 } else {
726 value.unsigned_abs()
727 };
728 cur.stats.records_indexed = Some(value);
729 cur.prom.records_indexed.set(value);
730 }
731
732 pub fn initialize_rehydration_latency_ms(&self) {
734 let mut cur = self.stats.borrow_mut();
735 cur.stats.rehydration_latency_ms = Some(None);
736 }
737
738 pub fn update_envelope_state_tombstones_by(&self, value: i64) {
742 let mut cur = self.stats.borrow_mut();
743 if let Some(updated) = cur
744 .stats
745 .envelope_state_tombstones
746 .checked_add_signed(value)
747 {
748 cur.stats.envelope_state_tombstones = updated;
749 cur.prom.envelope_state_tombstones.set(updated);
750 } else {
751 let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
752 tracing::warn!(
753 "Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
754 envelope_state_tombstones,
755 value
756 );
757 cur.stats.envelope_state_tombstones = 0;
758 cur.prom.envelope_state_tombstones.set(0);
759 }
760 }
761
762 pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
764 let mut cur = self.stats.borrow_mut();
765
766 if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
767 return; }
769 if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
770 return; }
772
773 let elapsed = self.meta.created_at.elapsed();
774 let value = elapsed
775 .as_millis()
776 .try_into()
777 .expect("Rehydration took more than ~584 million years!");
778 cur.stats.rehydration_latency_ms = Some(Some(value));
779 cur.prom.rehydration_latency_ms.set(value);
780 }
781
782 pub fn set_offset_known(&self, value: u64) {
784 let mut cur = self.stats.borrow_mut();
785 cur.stats.offset_known = Some(Some(value));
786 cur.prom.offset_known.set(value);
787 }
788
789 pub fn set_offset_committed(&self, value: u64) {
791 let mut cur = self.stats.borrow_mut();
792 cur.stats.offset_committed = Some(Some(value));
793 cur.prom.offset_committed.set(value);
794 }
795
796 pub fn set_snapshot_records_known(&self, value: u64) {
798 let mut cur = self.stats.borrow_mut();
799 cur.stats.snapshot_records_known = Some(Some(value));
800 cur.prom.snapshot_records_known.set(value);
801 }
802
803 pub fn set_snapshot_records_staged(&self, value: u64) {
805 let mut cur = self.stats.borrow_mut();
806 cur.stats.snapshot_records_staged = Some(Some(value));
807 cur.prom.snapshot_records_staged.set(value);
808 }
809}
810
811impl SinkStatistics {
812 pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
813 Self {
814 stats: Rc::new(RefCell::new(StatsInner {
815 stats: SinkStatisticsRecord {
816 id,
817 worker_id,
818 messages_staged: 0,
819 messages_committed: 0,
820 bytes_staged: 0,
821 bytes_committed: 0,
822 },
823 prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
824 })),
825 meta: (),
826 }
827 }
828
829 pub fn reset_gauges(&self) {
832 let cur = self.stats.borrow_mut();
833 cur.stats.reset_gauges();
834 }
835
836 pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
840 let mut cur = self.stats.borrow_mut();
841
842 match &cur.stats {
843 SinkStatisticsRecord { .. } => {
844 let ret = Some(cur.stats.clone());
845 cur.stats.reset_counters();
846 ret
847 }
848 }
849 }
850
851 pub fn inc_messages_staged_by(&self, value: u64) {
853 let mut cur = self.stats.borrow_mut();
854 cur.stats.messages_staged = cur.stats.messages_staged + value;
855 cur.prom.messages_staged.inc_by(value);
856 }
857
858 pub fn inc_bytes_staged_by(&self, value: u64) {
860 let mut cur = self.stats.borrow_mut();
861 cur.stats.bytes_staged = cur.stats.bytes_staged + value;
862 cur.prom.bytes_staged.inc_by(value);
863 }
864
865 pub fn inc_messages_committed_by(&self, value: u64) {
867 let mut cur = self.stats.borrow_mut();
868 cur.stats.messages_committed = cur.stats.messages_committed + value;
869 cur.prom.messages_committed.inc_by(value);
870 }
871
872 pub fn inc_bytes_committed_by(&self, value: u64) {
874 let mut cur = self.stats.borrow_mut();
875 cur.stats.bytes_committed = cur.stats.bytes_committed + value;
876 cur.prom.bytes_committed.inc_by(value);
877 }
878}
879
880pub struct AggregatedStatistics {
905 worker_id: usize,
906 worker_count: usize,
907 local_source_statistics: BTreeMap<GlobalId, (usize, GlobalId, SourceStatistics)>,
908 local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
909
910 global_source_statistics:
911 BTreeMap<GlobalId, (usize, GlobalId, Vec<Option<SourceStatisticsUpdate>>)>,
912 global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
913}
914
915impl AggregatedStatistics {
916 pub fn new(worker_id: usize, worker_count: usize) -> Self {
918 AggregatedStatistics {
919 worker_id,
920 worker_count,
921 local_source_statistics: Default::default(),
922 local_sink_statistics: Default::default(),
923 global_source_statistics: Default::default(),
924 global_sink_statistics: Default::default(),
925 }
926 }
927
928 pub fn get_ingestion_stats(
930 &self,
931 ingestion_id: &GlobalId,
932 ) -> BTreeMap<GlobalId, SourceStatistics> {
933 let mut ingestion_stats = BTreeMap::new();
934 for (id, (_epoch, ingestion_id2, stats)) in self.local_source_statistics.iter() {
935 if ingestion_id == ingestion_id2 {
936 ingestion_stats.insert(id.clone(), stats.clone());
937 }
938 }
939 ingestion_stats
940 }
941
942 pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
944 self.local_source_statistics.get(id).map(|(_, _, s)| s)
945 }
946
947 pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
949 self.local_sink_statistics.get(id).map(|(_, s)| s)
950 }
951
952 pub fn deinitialize(&mut self, id: GlobalId) {
955 self.local_source_statistics.remove(&id);
956 self.local_sink_statistics.remove(&id);
957 self.global_source_statistics.remove(&id);
958 self.global_sink_statistics.remove(&id);
959 }
960
961 pub fn advance_global_epoch(&mut self, id: GlobalId) {
966 if let Some((epoch, _ingestion_id, stats)) = self.global_source_statistics.get_mut(&id) {
967 *epoch += 1;
968 for worker_stats in stats {
969 if let Some(update) = worker_stats {
970 update.reset_gauges();
971 }
972 }
973 }
974 if let Some((epoch, stats)) = self.global_sink_statistics.get_mut(&id) {
975 *epoch += 1;
976 for worker_stats in stats {
977 if let Some(update) = worker_stats {
978 update.reset_gauges();
979 }
980 }
981 }
982 }
983
984 pub fn initialize_source<F: FnOnce() -> SourceStatistics>(
986 &mut self,
987 id: GlobalId,
988 ingestion_id: GlobalId,
989 resume_upper: Antichain<Timestamp>,
990 stats: F,
991 ) {
992 self.local_source_statistics
993 .entry(id)
994 .and_modify(|(epoch, ingestion_id2, stats)| {
995 assert_eq!(ingestion_id, *ingestion_id2);
996 *epoch += 1;
997 stats.reset_gauges();
998 stats.meta = SourceStatisticsMetadata::new(resume_upper);
999 })
1000 .or_insert_with(|| (0, ingestion_id, stats()));
1001
1002 if self.worker_id == 0 {
1003 self.global_source_statistics
1004 .entry(id)
1005 .or_insert_with(|| (0, ingestion_id, vec![None; self.worker_count]));
1006 }
1007 }
1008
1009 pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
1011 self.local_sink_statistics
1012 .entry(id)
1013 .and_modify(|(epoch, stats)| {
1014 *epoch += 1;
1015 stats.reset_gauges();
1016 })
1017 .or_insert_with(|| (0, stats()));
1018 if self.worker_id == 0 {
1019 self.global_sink_statistics
1020 .entry(id)
1021 .or_insert_with(|| (0, vec![None; self.worker_count]));
1022 }
1023 }
1024
1025 pub fn ingest(
1027 &mut self,
1028 source_statistics: Vec<(usize, SourceStatisticsRecord)>,
1029 sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
1030 ) {
1031 if self.worker_id != 0 {
1033 return;
1034 }
1035
1036 for (epoch, stat) in source_statistics {
1037 if let Some((global_epoch, _, stats)) = self.global_source_statistics.get_mut(&stat.id)
1038 {
1039 let epoch_match = epoch >= *global_epoch;
1042 let mut update = stat.as_update();
1043 match (&mut stats[stat.worker_id], epoch_match) {
1044 (None, true) => stats[stat.worker_id] = Some(update),
1045 (None, false) => {
1046 update.reset_gauges();
1047 stats[stat.worker_id] = Some(update);
1048 }
1049 (Some(occupied), true) => occupied.incorporate(update),
1050 (Some(occupied), false) => occupied.incorporate_counters(update),
1051 }
1052 }
1053 }
1054
1055 for (epoch, stat) in sink_statistics {
1056 if let Some((global_epoch, stats)) = self.global_sink_statistics.get_mut(&stat.id) {
1057 let epoch_match = epoch >= *global_epoch;
1060 let update = stat.as_update();
1061 match (&mut stats[stat.worker_id], epoch_match) {
1062 (None, true) => stats[stat.worker_id] = Some(update),
1063 (None, false) => {
1064 update.reset_gauges();
1065 stats[stat.worker_id] = Some(update);
1066 }
1067 (Some(occupied), true) => occupied.incorporate(update),
1068 (Some(occupied), false) => occupied.incorporate_counters(update),
1069 }
1070 }
1071 }
1072 }
1073
1074 fn _emit_local(
1075 &mut self,
1076 ) -> (
1077 Vec<(usize, SourceStatisticsRecord)>,
1078 Vec<(usize, SinkStatisticsRecord)>,
1079 ) {
1080 let sources = self
1081 .local_source_statistics
1082 .values_mut()
1083 .flat_map(|(epoch, _, s)| s.snapshot().map(|v| (*epoch, v)))
1084 .collect();
1085
1086 let sinks = self
1087 .local_sink_statistics
1088 .values_mut()
1089 .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1090 .collect();
1091
1092 (sources, sinks)
1093 }
1094
1095 pub fn emit_local(
1097 &mut self,
1098 ) -> (
1099 Vec<(usize, SourceStatisticsRecord)>,
1100 Vec<(usize, SinkStatisticsRecord)>,
1101 ) {
1102 if self.worker_id == 0 {
1105 return (Vec::new(), Vec::new());
1106 }
1107
1108 self._emit_local()
1109 }
1110
1111 pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
1114 if !self.worker_id == 0 {
1115 return (Vec::new(), Vec::new());
1116 }
1117
1118 let (sources, sinks) = self._emit_local();
1119 self.ingest(sources, sinks);
1120
1121 let sources = self
1122 .global_source_statistics
1123 .iter_mut()
1124 .filter_map(|(_, (_, _, s))| {
1125 if s.iter().all(|s| s.is_some()) {
1126 let ret = Some(SourceStatisticsUpdate::summarize(|| {
1127 s.iter().filter_map(Option::as_ref)
1128 }));
1129
1130 s.iter_mut().for_each(|s| {
1132 if let Some(s) = s {
1133 s.reset_counters();
1134 }
1135 });
1136 ret
1137 } else {
1138 None
1139 }
1140 })
1141 .collect();
1142
1143 let sinks = self
1144 .global_sink_statistics
1145 .iter_mut()
1146 .filter_map(|(_, (_, s))| {
1147 if s.iter().all(|s| s.is_some()) {
1148 let ret = Some(SinkStatisticsUpdate::summarize(|| {
1149 s.iter().filter_map(Option::as_ref)
1150 }));
1151
1152 s.iter_mut().for_each(|s| {
1154 if let Some(s) = s {
1155 s.reset_counters();
1156 }
1157 });
1158 ret
1159 } else {
1160 None
1161 }
1162 })
1163 .collect();
1164
1165 (sources, sinks)
1166 }
1167}