1use std::cell::RefCell;
26use std::collections::BTreeMap;
27use std::rc::Rc;
28use std::time::Instant;
29
30use mz_ore::metric;
31use mz_ore::metrics::{
32 DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricTag,
33 MetricVisibility, MetricsRegistry, UIntGaugeVec,
34};
35use mz_repr::{GlobalId, Timestamp};
36use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
37use mz_storage_types::sources::SourceEnvelope;
38use prometheus::core::{AtomicI64, AtomicU64};
39use serde::{Deserialize, Serialize};
40use timely::PartialOrder;
41use timely::progress::frontier::Antichain;
42
43#[derive(Clone, Debug)]
47pub(crate) struct SourceStatisticsMetricDefs {
48 pub(crate) messages_received: IntCounterVec,
50 pub(crate) updates_staged: IntCounterVec,
51 pub(crate) updates_committed: IntCounterVec,
52 pub(crate) bytes_received: IntCounterVec,
53
54 pub(crate) snapshot_committed: UIntGaugeVec,
56 pub(crate) bytes_indexed: UIntGaugeVec,
57 pub(crate) records_indexed: UIntGaugeVec,
58 pub(crate) rehydration_latency_ms: IntGaugeVec,
59
60 pub(crate) offset_known: UIntGaugeVec,
61 pub(crate) offset_committed: UIntGaugeVec,
62 pub(crate) snapshot_records_known: UIntGaugeVec,
63 pub(crate) snapshot_records_staged: UIntGaugeVec,
64
65 pub(crate) envelope_state_tombstones: UIntGaugeVec,
67}
68
69impl SourceStatisticsMetricDefs {
70 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71 Self {
72 snapshot_committed: registry.register(metric!(
73 name: "mz_source_snapshot_committed",
74 help: "Whether or not the worker has committed the initial snapshot for a source.",
75 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
76 )),
77 messages_received: registry.register(metric!(
78 name: "mz_source_messages_received",
79 help: "The number of raw messages the worker has received from upstream.",
80 var_labels: ["source_id", "worker_id", "parent_source_id"],
81 visibility: MetricVisibility::Public,
82 tags: [MetricTag::Source],
83 )),
84 updates_staged: registry.register(metric!(
85 name: "mz_source_updates_staged",
86 help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
87 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
88 )),
89 updates_committed: registry.register(metric!(
90 name: "mz_source_updates_committed",
91 help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
92 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
93 )),
94 bytes_received: registry.register(metric!(
95 name: "mz_source_bytes_received",
96 help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
97 var_labels: ["source_id", "worker_id", "parent_source_id"],
98 visibility: MetricVisibility::Public,
99 tags: [MetricTag::Source],
100 )),
101 bytes_indexed: registry.register(metric!(
102 name: "mz_source_bytes_indexed",
103 help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
104 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
105 )),
106 records_indexed: registry.register(metric!(
107 name: "mz_source_records_indexed",
108 help: "The number of records in the source envelope state. This will be specific to the envelope in use",
109 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
110 )),
111 envelope_state_tombstones: registry.register(metric!(
112 name: "mz_source_envelope_state_tombstones",
113 help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
114 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
115 )),
116 rehydration_latency_ms: registry.register(metric!(
117 name: "mz_source_rehydration_latency_ms",
118 help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
119 var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
120 )),
121 offset_known: registry.register(metric!(
122 name: "mz_source_offset_known",
123 help: "The total number of _values_ (source-defined unit) present in upstream.",
124 var_labels: ["source_id", "worker_id", "shard_id"],
125 visibility: MetricVisibility::Public,
126 tags: [MetricTag::Source],
127 )),
128 offset_committed: registry.register(metric!(
129 name: "mz_source_offset_committed",
130 help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
131 var_labels: ["source_id", "worker_id", "shard_id"],
132 visibility: MetricVisibility::Public,
133 tags: [MetricTag::Source],
134 )),
135 snapshot_records_known: registry.register(metric!(
136 name: "mz_source_snapshot_records_known",
137 help: "The total number of records in the source's snapshot",
138 var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
139 )),
140 snapshot_records_staged: registry.register(metric!(
141 name: "mz_source_snapshot_records_staged",
142 help: "The total number of records read from the source's snapshot",
143 var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
144 )),
145 }
146 }
147}
148
149#[derive(Debug)]
151pub struct SourceStatisticsMetrics {
152 pub(crate) messages_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154 pub(crate) updates_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
155 pub(crate) updates_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
156 pub(crate) bytes_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
157
158 pub(crate) snapshot_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
160 pub(crate) bytes_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
161 pub(crate) records_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
162 pub(crate) rehydration_latency_ms: DeleteOnDropGauge<AtomicI64, Vec<String>>,
163
164 pub(crate) offset_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
165 pub(crate) offset_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
166 pub(crate) snapshot_records_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
167 pub(crate) snapshot_records_staged: DeleteOnDropGauge<AtomicU64, Vec<String>>,
168
169 pub(crate) envelope_state_tombstones: DeleteOnDropGauge<AtomicU64, Vec<String>>,
171}
172
173impl SourceStatisticsMetrics {
174 pub(crate) fn new(
175 defs: &SourceStatisticsMetricDefs,
176 id: GlobalId,
177 worker_id: usize,
178 parent_source_id: GlobalId,
179 shard_id: &mz_persist_client::ShardId,
180 envelope: SourceEnvelope,
181 ) -> SourceStatisticsMetrics {
182 let shard = shard_id.to_string();
183 let envelope = match envelope {
184 SourceEnvelope::None(_) => "none",
185 SourceEnvelope::Upsert(_) => "upsert",
186 SourceEnvelope::CdcV2 => "cdcv2",
187 };
188
189 SourceStatisticsMetrics {
190 snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
191 id.to_string(),
192 worker_id.to_string(),
193 parent_source_id.to_string(),
194 shard.clone(),
195 ]),
196 messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
197 id.to_string(),
198 worker_id.to_string(),
199 parent_source_id.to_string(),
200 ]),
201 updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
202 id.to_string(),
203 worker_id.to_string(),
204 parent_source_id.to_string(),
205 shard.clone(),
206 ]),
207 updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
208 id.to_string(),
209 worker_id.to_string(),
210 parent_source_id.to_string(),
211 shard.clone(),
212 ]),
213 bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
214 id.to_string(),
215 worker_id.to_string(),
216 parent_source_id.to_string(),
217 ]),
218 bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
219 id.to_string(),
220 worker_id.to_string(),
221 parent_source_id.to_string(),
222 shard.clone(),
223 ]),
224 records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
225 id.to_string(),
226 worker_id.to_string(),
227 parent_source_id.to_string(),
228 shard.clone(),
229 ]),
230 envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
231 vec![
232 id.to_string(),
233 worker_id.to_string(),
234 parent_source_id.to_string(),
235 shard.clone(),
236 ],
237 ),
238 rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
239 id.to_string(),
240 worker_id.to_string(),
241 parent_source_id.to_string(),
242 shard.clone(),
243 envelope.to_string(),
244 ]),
245 offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
246 id.to_string(),
247 worker_id.to_string(),
248 shard.clone(),
249 ]),
250 offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
251 id.to_string(),
252 worker_id.to_string(),
253 shard.clone(),
254 ]),
255 snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
256 id.to_string(),
257 worker_id.to_string(),
258 shard.clone(),
259 parent_source_id.to_string(),
260 ]),
261 snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
262 id.to_string(),
263 worker_id.to_string(),
264 shard.clone(),
265 parent_source_id.to_string(),
266 ]),
267 }
268 }
269}
270
271#[derive(Clone, Debug)]
272pub(crate) struct SinkStatisticsMetricDefs {
273 pub(crate) messages_staged: IntCounterVec,
275 pub(crate) messages_committed: IntCounterVec,
276 pub(crate) bytes_staged: IntCounterVec,
277 pub(crate) bytes_committed: IntCounterVec,
278}
279
280impl SinkStatisticsMetricDefs {
281 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
282 Self {
283 messages_staged: registry.register(metric!(
284 name: "mz_sink_messages_staged",
285 help: "The number of messages staged but possibly not committed to the sink.",
286 var_labels: ["sink_id", "worker_id"],
287 )),
288 messages_committed: registry.register(metric!(
289 name: "mz_sink_messages_committed",
290 help: "The number of messages committed to the sink.",
291 var_labels: ["sink_id", "worker_id"],
292 )),
293 bytes_staged: registry.register(metric!(
294 name: "mz_sink_bytes_staged",
295 help: "The number of bytes staged but possibly not committed to the sink.",
296 var_labels: ["sink_id", "worker_id"],
297 visibility: MetricVisibility::Public,
298 tags: [MetricTag::Sink],
299 )),
300 bytes_committed: registry.register(metric!(
301 name: "mz_sink_bytes_committed",
302 help: "The number of bytes committed to the sink.",
303 var_labels: ["sink_id", "worker_id"],
304 visibility: MetricVisibility::Public,
305 tags: [MetricTag::Sink],
306 )),
307 }
308 }
309}
310
311#[derive(Debug)]
313pub struct SinkStatisticsMetrics {
314 pub(crate) messages_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
316 pub(crate) messages_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
317 pub(crate) bytes_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
318 pub(crate) bytes_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
319}
320
321impl SinkStatisticsMetrics {
322 pub(crate) fn new(
323 defs: &SinkStatisticsMetricDefs,
324 id: GlobalId,
325 worker_id: usize,
326 ) -> SinkStatisticsMetrics {
327 SinkStatisticsMetrics {
328 messages_staged: defs
329 .messages_staged
330 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
331 messages_committed: defs
332 .messages_committed
333 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
334 bytes_staged: defs
335 .bytes_staged
336 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
337 bytes_committed: defs
338 .bytes_committed
339 .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
340 }
341 }
342}
343
344#[derive(Debug, Clone)]
346pub struct SourceStatisticsMetadata {
347 resume_upper: Antichain<Timestamp>,
349 created_at: Instant,
351}
352
353impl SourceStatisticsMetadata {
354 pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
356 Self {
357 resume_upper,
358 created_at: Instant::now(),
359 }
360 }
361}
362
363#[derive(Debug)]
364struct StatsInner<Stats, Metrics> {
365 stats: Stats,
366 prom: Metrics,
367}
368
369#[derive(Debug)]
381pub struct StorageStatistics<Stats, Metrics, Meta> {
382 stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
386 meta: Meta,
388}
389
390impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
391 fn clone(&self) -> Self {
392 Self {
393 stats: Rc::clone(&self.stats),
394 meta: self.meta.clone(),
395 }
396 }
397}
398
399#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
401pub struct SourceStatisticsRecord {
402 id: GlobalId,
403 worker_id: usize,
404 messages_received: u64,
406 bytes_received: u64,
407 updates_staged: u64,
408 updates_committed: u64,
409
410 records_indexed: Option<u64>,
413 bytes_indexed: Option<u64>,
414
415 rehydration_latency_ms: Option<Option<i64>>,
417
418 snapshot_records_known: Option<Option<u64>>,
421 snapshot_records_staged: Option<Option<u64>>,
422 snapshot_committed: Option<bool>,
423 offset_known: Option<Option<u64>>,
424 offset_committed: Option<Option<u64>>,
425
426 envelope_state_tombstones: u64,
428}
429
430impl SourceStatisticsRecord {
431 fn reset_gauges(&mut self) {
432 self.rehydration_latency_ms = None;
435 self.snapshot_committed = None;
436
437 self.bytes_indexed = Some(0);
439 self.records_indexed = Some(0);
440
441 self.snapshot_records_known = Some(None);
443 self.snapshot_records_staged = Some(None);
444 self.offset_known = Some(None);
445 self.offset_committed = Some(None);
446
447 self.envelope_state_tombstones = 0;
448 }
449
450 fn reset_counters(&mut self) {
452 self.messages_received = 0;
453 self.bytes_received = 0;
454 self.updates_staged = 0;
455 self.updates_committed = 0;
456 }
457
458 fn as_update(&self) -> SourceStatisticsUpdate {
461 let SourceStatisticsRecord {
462 id,
463 worker_id: _,
464 messages_received,
465 bytes_received,
466 updates_staged,
467 updates_committed,
468 records_indexed,
469 bytes_indexed,
470 rehydration_latency_ms,
471 snapshot_records_known,
472 snapshot_records_staged,
473 snapshot_committed,
474 offset_known,
475 offset_committed,
476 envelope_state_tombstones: _,
477 } = self.clone();
478
479 SourceStatisticsUpdate {
480 id,
481 messages_received: messages_received.into(),
482 bytes_received: bytes_received.into(),
483 updates_staged: updates_staged.into(),
484 updates_committed: updates_committed.into(),
485 records_indexed: Gauge::gauge(records_indexed.unwrap()),
486 bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
487 rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
488 snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
489 snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
490 snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
491 offset_known: Gauge::gauge(offset_known.unwrap()),
492 offset_committed: Gauge::gauge(offset_committed.unwrap()),
493 }
494 }
495}
496
497#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
499pub struct SinkStatisticsRecord {
500 id: GlobalId,
501 worker_id: usize,
502 messages_staged: u64,
504 messages_committed: u64,
505 bytes_staged: u64,
506 bytes_committed: u64,
507}
508
509impl SinkStatisticsRecord {
510 fn reset_gauges(&self) {}
511
512 fn reset_counters(&mut self) {
514 self.messages_staged = 0;
515 self.messages_committed = 0;
516 self.bytes_staged = 0;
517 self.bytes_committed = 0;
518 }
519
520 fn as_update(&self) -> SinkStatisticsUpdate {
523 let SinkStatisticsRecord {
524 id,
525 worker_id: _,
526 messages_staged,
527 messages_committed,
528 bytes_staged,
529 bytes_committed,
530 } = self.clone();
531
532 SinkStatisticsUpdate {
533 id,
534 messages_staged: messages_staged.into(),
535 messages_committed: messages_committed.into(),
536 bytes_staged: bytes_staged.into(),
537 bytes_committed: bytes_committed.into(),
538 }
539 }
540}
541
542pub type SourceStatistics =
544 StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
545
546pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
548
549impl SourceStatistics {
550 pub(crate) fn new(
551 id: GlobalId,
552 worker_id: usize,
553 metrics: &SourceStatisticsMetricDefs,
554 parent_source_id: GlobalId,
555 shard_id: &mz_persist_client::ShardId,
556 envelope: SourceEnvelope,
557 resume_upper: Antichain<Timestamp>,
558 ) -> Self {
559 Self {
560 stats: Rc::new(RefCell::new(StatsInner {
561 stats: SourceStatisticsRecord {
562 id,
563 worker_id,
564 messages_received: 0,
565 updates_staged: 0,
566 updates_committed: 0,
567 bytes_received: 0,
568 records_indexed: Some(0),
569 bytes_indexed: Some(0),
570 rehydration_latency_ms: None,
571 snapshot_records_staged: Some(None),
572 snapshot_records_known: Some(None),
573 snapshot_committed: None,
574 offset_known: Some(None),
575 offset_committed: Some(None),
576 envelope_state_tombstones: 0,
577 },
578 prom: SourceStatisticsMetrics::new(
579 metrics,
580 id,
581 worker_id,
582 parent_source_id,
583 shard_id,
584 envelope,
585 ),
586 })),
587 meta: SourceStatisticsMetadata::new(resume_upper),
588 }
589 }
590
591 pub fn reset_gauges(&self) {
594 let mut cur = self.stats.borrow_mut();
595 cur.stats.reset_gauges();
596 }
597
598 pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
602 let mut cur = self.stats.borrow_mut();
603
604 match &cur.stats {
605 SourceStatisticsRecord {
606 records_indexed: Some(_),
607 bytes_indexed: Some(_),
608 rehydration_latency_ms: Some(_),
609 snapshot_records_known: Some(_),
610 snapshot_records_staged: Some(_),
611 snapshot_committed: Some(_),
612 offset_known: Some(_),
613 offset_committed: Some(_),
614 ..
615 } => {
616 let ret = Some(cur.stats.clone());
617 cur.stats.reset_counters();
618 ret
619 }
620 _ => None,
621 }
622 }
623
624 pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
631 self.update_snapshot_committed(upper);
632 }
633
634 pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
636 let value = *upper != Antichain::from_elem(Timestamp::MIN);
637 let mut cur = self.stats.borrow_mut();
638 cur.stats.snapshot_committed = Some(value);
639 cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
640 }
641
642 pub fn inc_messages_received_by(&self, value: u64) {
644 let mut cur = self.stats.borrow_mut();
645 cur.stats.messages_received = cur.stats.messages_received + value;
646 cur.prom.messages_received.inc_by(value);
647 }
648
649 pub fn inc_updates_staged_by(&self, value: u64) {
651 let mut cur = self.stats.borrow_mut();
652 cur.stats.updates_staged = cur.stats.updates_staged + value;
653 cur.prom.updates_staged.inc_by(value);
654 }
655
656 pub fn inc_updates_committed_by(&self, value: u64) {
658 let mut cur = self.stats.borrow_mut();
659 cur.stats.updates_committed = cur.stats.updates_committed + value;
660 cur.prom.updates_committed.inc_by(value);
661 }
662
663 pub fn inc_bytes_received_by(&self, value: u64) {
665 let mut cur = self.stats.borrow_mut();
666 cur.stats.bytes_received = cur.stats.bytes_received + value;
667 cur.prom.bytes_received.inc_by(value);
668 }
669
670 pub fn update_bytes_indexed_by(&self, value: i64) {
673 let mut cur = self.stats.borrow_mut();
674 if let Some(updated) = cur
675 .stats
676 .bytes_indexed
677 .unwrap_or(0)
678 .checked_add_signed(value)
679 {
680 cur.stats.bytes_indexed = Some(updated);
681 cur.prom.bytes_indexed.set(updated);
682 } else {
683 let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
684 tracing::warn!(
685 "Unexpected u64 overflow while updating bytes_indexed value {} with {}",
686 bytes_indexed,
687 value
688 );
689 cur.stats.bytes_indexed = Some(0);
690 cur.prom.bytes_indexed.set(0);
691 }
692 }
693
694 pub fn set_bytes_indexed(&self, value: i64) {
696 let mut cur = self.stats.borrow_mut();
697 let value = if value < 0 {
698 tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
699 0
700 } else {
701 value.unsigned_abs()
702 };
703 cur.stats.bytes_indexed = Some(value);
704 cur.prom.bytes_indexed.set(value);
705 }
706
707 pub fn update_records_indexed_by(&self, value: i64) {
710 let mut cur = self.stats.borrow_mut();
711 if let Some(updated) = cur
712 .stats
713 .records_indexed
714 .unwrap_or(0)
715 .checked_add_signed(value)
716 {
717 cur.stats.records_indexed = Some(updated);
718 cur.prom.records_indexed.set(updated);
719 } else {
720 let records_indexed = cur.stats.records_indexed.unwrap_or(0);
721 tracing::warn!(
722 "Unexpected u64 overflow while updating records_indexed value {} with {}",
723 records_indexed,
724 value
725 );
726 cur.stats.records_indexed = Some(0);
727 cur.prom.records_indexed.set(0);
728 }
729 }
730
731 pub fn set_records_indexed(&self, value: i64) {
733 let mut cur = self.stats.borrow_mut();
734 let value = if value < 0 {
735 tracing::warn!("Unexpected negative value for records_indexed {}", value);
736 0
737 } else {
738 value.unsigned_abs()
739 };
740 cur.stats.records_indexed = Some(value);
741 cur.prom.records_indexed.set(value);
742 }
743
744 pub fn initialize_rehydration_latency_ms(&self) {
746 let mut cur = self.stats.borrow_mut();
747 cur.stats.rehydration_latency_ms = Some(None);
748 }
749
750 pub fn update_envelope_state_tombstones_by(&self, value: i64) {
754 let mut cur = self.stats.borrow_mut();
755 if let Some(updated) = cur
756 .stats
757 .envelope_state_tombstones
758 .checked_add_signed(value)
759 {
760 cur.stats.envelope_state_tombstones = updated;
761 cur.prom.envelope_state_tombstones.set(updated);
762 } else {
763 let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
764 tracing::warn!(
765 "Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
766 envelope_state_tombstones,
767 value
768 );
769 cur.stats.envelope_state_tombstones = 0;
770 cur.prom.envelope_state_tombstones.set(0);
771 }
772 }
773
774 pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
776 let mut cur = self.stats.borrow_mut();
777
778 if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
779 return; }
781 if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
782 return; }
784
785 let elapsed = self.meta.created_at.elapsed();
786 let value = elapsed
787 .as_millis()
788 .try_into()
789 .expect("Rehydration took more than ~584 million years!");
790 cur.stats.rehydration_latency_ms = Some(Some(value));
791 cur.prom.rehydration_latency_ms.set(value);
792 }
793
794 pub fn set_offset_known(&self, value: u64) {
796 let mut cur = self.stats.borrow_mut();
797 cur.stats.offset_known = Some(Some(value));
798 cur.prom.offset_known.set(value);
799 }
800
801 pub fn set_offset_committed(&self, value: u64) {
803 let mut cur = self.stats.borrow_mut();
804 cur.stats.offset_committed = Some(Some(value));
805 cur.prom.offset_committed.set(value);
806 }
807
808 pub fn set_snapshot_records_known(&self, value: u64) {
810 let mut cur = self.stats.borrow_mut();
811 cur.stats.snapshot_records_known = Some(Some(value));
812 cur.prom.snapshot_records_known.set(value);
813 }
814
815 pub fn set_snapshot_records_staged(&self, value: u64) {
817 let mut cur = self.stats.borrow_mut();
818 cur.stats.snapshot_records_staged = Some(Some(value));
819 cur.prom.snapshot_records_staged.set(value);
820 }
821}
822
823impl SinkStatistics {
824 pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
825 Self {
826 stats: Rc::new(RefCell::new(StatsInner {
827 stats: SinkStatisticsRecord {
828 id,
829 worker_id,
830 messages_staged: 0,
831 messages_committed: 0,
832 bytes_staged: 0,
833 bytes_committed: 0,
834 },
835 prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
836 })),
837 meta: (),
838 }
839 }
840
841 pub fn reset_gauges(&self) {
844 let cur = self.stats.borrow_mut();
845 cur.stats.reset_gauges();
846 }
847
848 pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
852 let mut cur = self.stats.borrow_mut();
853
854 match &cur.stats {
855 SinkStatisticsRecord { .. } => {
856 let ret = Some(cur.stats.clone());
857 cur.stats.reset_counters();
858 ret
859 }
860 }
861 }
862
863 pub fn inc_messages_staged_by(&self, value: u64) {
865 let mut cur = self.stats.borrow_mut();
866 cur.stats.messages_staged = cur.stats.messages_staged + value;
867 cur.prom.messages_staged.inc_by(value);
868 }
869
870 pub fn inc_bytes_staged_by(&self, value: u64) {
872 let mut cur = self.stats.borrow_mut();
873 cur.stats.bytes_staged = cur.stats.bytes_staged + value;
874 cur.prom.bytes_staged.inc_by(value);
875 }
876
877 pub fn inc_messages_committed_by(&self, value: u64) {
879 let mut cur = self.stats.borrow_mut();
880 cur.stats.messages_committed = cur.stats.messages_committed + value;
881 cur.prom.messages_committed.inc_by(value);
882 }
883
884 pub fn inc_bytes_committed_by(&self, value: u64) {
886 let mut cur = self.stats.borrow_mut();
887 cur.stats.bytes_committed = cur.stats.bytes_committed + value;
888 cur.prom.bytes_committed.inc_by(value);
889 }
890}
891
892pub struct AggregatedStatistics {
917 worker_id: usize,
918 worker_count: usize,
919 local_source_statistics: BTreeMap<GlobalId, (usize, GlobalId, SourceStatistics)>,
920 local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
921
922 global_source_statistics:
923 BTreeMap<GlobalId, (usize, GlobalId, Vec<Option<SourceStatisticsUpdate>>)>,
924 global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
925}
926
927impl AggregatedStatistics {
928 pub fn new(worker_id: usize, worker_count: usize) -> Self {
930 AggregatedStatistics {
931 worker_id,
932 worker_count,
933 local_source_statistics: Default::default(),
934 local_sink_statistics: Default::default(),
935 global_source_statistics: Default::default(),
936 global_sink_statistics: Default::default(),
937 }
938 }
939
940 pub fn get_ingestion_stats(
942 &self,
943 ingestion_id: &GlobalId,
944 ) -> BTreeMap<GlobalId, SourceStatistics> {
945 let mut ingestion_stats = BTreeMap::new();
946 for (id, (_epoch, ingestion_id2, stats)) in self.local_source_statistics.iter() {
947 if ingestion_id == ingestion_id2 {
948 ingestion_stats.insert(id.clone(), stats.clone());
949 }
950 }
951 ingestion_stats
952 }
953
954 pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
956 self.local_source_statistics.get(id).map(|(_, _, s)| s)
957 }
958
959 pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
961 self.local_sink_statistics.get(id).map(|(_, s)| s)
962 }
963
964 pub fn deinitialize(&mut self, id: GlobalId) {
967 self.local_source_statistics.remove(&id);
968 self.local_sink_statistics.remove(&id);
969 self.global_source_statistics.remove(&id);
970 self.global_sink_statistics.remove(&id);
971 }
972
973 pub fn advance_global_epoch(&mut self, id: GlobalId) {
978 if let Some((epoch, _ingestion_id, stats)) = self.global_source_statistics.get_mut(&id) {
979 *epoch += 1;
980 for worker_stats in stats {
981 if let Some(update) = worker_stats {
982 update.reset_gauges();
983 }
984 }
985 }
986 if let Some((epoch, stats)) = self.global_sink_statistics.get_mut(&id) {
987 *epoch += 1;
988 for worker_stats in stats {
989 if let Some(update) = worker_stats {
990 update.reset_gauges();
991 }
992 }
993 }
994 }
995
996 pub fn initialize_source<F: FnOnce() -> SourceStatistics>(
998 &mut self,
999 id: GlobalId,
1000 ingestion_id: GlobalId,
1001 resume_upper: Antichain<Timestamp>,
1002 stats: F,
1003 ) {
1004 self.local_source_statistics
1005 .entry(id)
1006 .and_modify(|(epoch, ingestion_id2, stats)| {
1007 assert_eq!(ingestion_id, *ingestion_id2);
1008 *epoch += 1;
1009 stats.reset_gauges();
1010 stats.meta = SourceStatisticsMetadata::new(resume_upper);
1011 })
1012 .or_insert_with(|| (0, ingestion_id, stats()));
1013
1014 if self.worker_id == 0 {
1015 self.global_source_statistics
1016 .entry(id)
1017 .or_insert_with(|| (0, ingestion_id, vec![None; self.worker_count]));
1018 }
1019 }
1020
1021 pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
1023 self.local_sink_statistics
1024 .entry(id)
1025 .and_modify(|(epoch, stats)| {
1026 *epoch += 1;
1027 stats.reset_gauges();
1028 })
1029 .or_insert_with(|| (0, stats()));
1030 if self.worker_id == 0 {
1031 self.global_sink_statistics
1032 .entry(id)
1033 .or_insert_with(|| (0, vec![None; self.worker_count]));
1034 }
1035 }
1036
1037 pub fn ingest(
1039 &mut self,
1040 source_statistics: Vec<(usize, SourceStatisticsRecord)>,
1041 sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
1042 ) {
1043 if self.worker_id != 0 {
1045 return;
1046 }
1047
1048 for (epoch, stat) in source_statistics {
1049 if let Some((global_epoch, _, stats)) = self.global_source_statistics.get_mut(&stat.id)
1050 {
1051 let epoch_match = epoch >= *global_epoch;
1054 let mut update = stat.as_update();
1055 match (&mut stats[stat.worker_id], epoch_match) {
1056 (None, true) => stats[stat.worker_id] = Some(update),
1057 (None, false) => {
1058 update.reset_gauges();
1059 stats[stat.worker_id] = Some(update);
1060 }
1061 (Some(occupied), true) => occupied.incorporate(update),
1062 (Some(occupied), false) => occupied.incorporate_counters(update),
1063 }
1064 }
1065 }
1066
1067 for (epoch, stat) in sink_statistics {
1068 if let Some((global_epoch, stats)) = self.global_sink_statistics.get_mut(&stat.id) {
1069 let epoch_match = epoch >= *global_epoch;
1072 let update = stat.as_update();
1073 match (&mut stats[stat.worker_id], epoch_match) {
1074 (None, true) => stats[stat.worker_id] = Some(update),
1075 (None, false) => {
1076 update.reset_gauges();
1077 stats[stat.worker_id] = Some(update);
1078 }
1079 (Some(occupied), true) => occupied.incorporate(update),
1080 (Some(occupied), false) => occupied.incorporate_counters(update),
1081 }
1082 }
1083 }
1084 }
1085
1086 fn _emit_local(
1087 &mut self,
1088 ) -> (
1089 Vec<(usize, SourceStatisticsRecord)>,
1090 Vec<(usize, SinkStatisticsRecord)>,
1091 ) {
1092 let sources = self
1093 .local_source_statistics
1094 .values_mut()
1095 .flat_map(|(epoch, _, s)| s.snapshot().map(|v| (*epoch, v)))
1096 .collect();
1097
1098 let sinks = self
1099 .local_sink_statistics
1100 .values_mut()
1101 .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1102 .collect();
1103
1104 (sources, sinks)
1105 }
1106
1107 pub fn emit_local(
1109 &mut self,
1110 ) -> (
1111 Vec<(usize, SourceStatisticsRecord)>,
1112 Vec<(usize, SinkStatisticsRecord)>,
1113 ) {
1114 if self.worker_id == 0 {
1117 return (Vec::new(), Vec::new());
1118 }
1119
1120 self._emit_local()
1121 }
1122
1123 pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
1126 if !self.worker_id == 0 {
1127 return (Vec::new(), Vec::new());
1128 }
1129
1130 let (sources, sinks) = self._emit_local();
1131 self.ingest(sources, sinks);
1132
1133 let sources = self
1134 .global_source_statistics
1135 .iter_mut()
1136 .filter_map(|(_, (_, _, s))| {
1137 if s.iter().all(|s| s.is_some()) {
1138 let ret = Some(SourceStatisticsUpdate::summarize(|| {
1139 s.iter().filter_map(Option::as_ref)
1140 }));
1141
1142 s.iter_mut().for_each(|s| {
1144 if let Some(s) = s {
1145 s.reset_counters();
1146 }
1147 });
1148 ret
1149 } else {
1150 None
1151 }
1152 })
1153 .collect();
1154
1155 let sinks = self
1156 .global_sink_statistics
1157 .iter_mut()
1158 .filter_map(|(_, (_, s))| {
1159 if s.iter().all(|s| s.is_some()) {
1160 let ret = Some(SinkStatisticsUpdate::summarize(|| {
1161 s.iter().filter_map(Option::as_ref)
1162 }));
1163
1164 s.iter_mut().for_each(|s| {
1166 if let Some(s) = s {
1167 s.reset_counters();
1168 }
1169 });
1170 ret
1171 } else {
1172 None
1173 }
1174 })
1175 .collect();
1176
1177 (sources, sinks)
1178 }
1179}