1use std::sync::LazyLock;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use serde::{Deserialize, Serialize};
23
24use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
25use mz_repr::{GlobalId, RelationDesc, Row, ScalarType};
26
27include!(concat!(env!("OUT_DIR"), "/mz_storage_client.statistics.rs"));
28
29pub static MZ_SOURCE_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
30 RelationDesc::builder()
31 .with_column("id", ScalarType::String.nullable(false))
33 .with_column("messages_received", ScalarType::UInt64.nullable(false))
39 .with_column("bytes_received", ScalarType::UInt64.nullable(false))
42 .with_column("updates_staged", ScalarType::UInt64.nullable(false))
45 .with_column("updates_committed", ScalarType::UInt64.nullable(false))
48 .with_column("records_indexed", ScalarType::UInt64.nullable(false))
54 .with_column("bytes_indexed", ScalarType::UInt64.nullable(false))
57 .with_column("rehydration_latency", ScalarType::Interval.nullable(true))
61 .with_column("snapshot_records_known", ScalarType::UInt64.nullable(true))
69 .with_column("snapshot_records_staged", ScalarType::UInt64.nullable(true))
76 .with_column("snapshot_committed", ScalarType::Bool.nullable(false))
81 .with_column("offset_known", ScalarType::UInt64.nullable(true))
90 .with_column("offset_committed", ScalarType::UInt64.nullable(true))
93 .finish()
94});
95
96pub static MZ_SINK_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
97 RelationDesc::builder()
98 .with_column("id", ScalarType::String.nullable(false))
100 .with_column("messages_staged", ScalarType::UInt64.nullable(false))
106 .with_column("messages_committed", ScalarType::UInt64.nullable(false))
109 .with_column("bytes_staged", ScalarType::UInt64.nullable(false))
112 .with_column("bytes_committed", ScalarType::UInt64.nullable(false))
115 .finish()
116});
117
118pub trait StorageMetric {
123 fn summarize<'a, I>(values: I) -> Self
126 where
127 I: IntoIterator<Item = &'a Self>,
128 Self: Sized + 'a;
129
130 fn incorporate(&mut self, other: Self, field_name: &'static str);
132}
133
134#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
136pub struct Counter(u64);
137
138impl StorageMetric for Counter {
139 fn summarize<'a, I>(values: I) -> Self
140 where
141 I: IntoIterator<Item = &'a Self>,
142 Self: Sized + 'a,
143 {
144 Self(values.into_iter().map(|c| c.0).sum())
146 }
147
148 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
149 self.0 += other.0
151 }
152}
153
154impl From<u64> for Counter {
155 fn from(f: u64) -> Self {
156 Counter(f)
157 }
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
162pub struct ResettingLatency(Option<i64>);
163
164impl From<Option<i64>> for ResettingLatency {
165 fn from(f: Option<i64>) -> Self {
166 ResettingLatency(f)
167 }
168}
169
170impl StorageMetric for ResettingLatency {
171 fn summarize<'a, I>(values: I) -> Self
172 where
173 I: IntoIterator<Item = &'a Self>,
174 Self: Sized + 'a,
175 {
176 let mut max = 0;
177 for value in values {
178 match value.0 {
179 None => return Self(None),
181 Some(value) => max = std::cmp::max(max, value),
183 }
184 }
185
186 Self(Some(max))
187 }
188
189 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
190 self.0 = other.0;
192 }
193}
194
195impl ResettingLatency {
196 fn reset(&mut self) {
197 self.0 = None;
198 }
199}
200
201#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
203pub struct ResettingNullableTotal(Option<u64>);
204
205impl StorageMetric for ResettingNullableTotal {
206 fn summarize<'a, I>(values: I) -> Self
207 where
208 I: IntoIterator<Item = &'a Self>,
209 Self: Sized + 'a,
210 {
211 let mut sum = 0;
212 for value in values {
213 match value.0 {
214 None => return Self(None),
216 Some(value) => sum += value,
218 }
219 }
220
221 Self(Some(sum))
222 }
223
224 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
225 match (&mut self.0, other.0) {
226 (None, other) => {
227 self.0 = other;
228 }
229 (Some(this), Some(other)) => *this = other,
231 (Some(_), None) => {
232 }
234 }
235 }
236}
237
238impl From<Option<u64>> for ResettingNullableTotal {
239 fn from(f: Option<u64>) -> Self {
240 ResettingNullableTotal(f)
241 }
242}
243
244impl ResettingNullableTotal {
245 fn reset(&mut self) {
246 self.0 = None;
247 }
248}
249
250#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
252pub struct ResettingTotal(u64);
253
254impl StorageMetric for ResettingTotal {
255 fn summarize<'a, I>(values: I) -> Self
256 where
257 I: IntoIterator<Item = &'a Self>,
258 Self: Sized + 'a,
259 {
260 Self(values.into_iter().map(|c| c.0).sum())
262 }
263
264 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
265 self.0 = other.0;
267 }
268}
269
270impl From<u64> for ResettingTotal {
271 fn from(f: u64) -> Self {
272 ResettingTotal(f)
273 }
274}
275
276impl ResettingTotal {
277 fn reset(&mut self) {
278 self.0 = 0;
279 }
280}
281
282#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
284pub struct Boolean(bool);
285
286impl StorageMetric for Boolean {
287 fn summarize<'a, I>(values: I) -> Self
288 where
289 I: IntoIterator<Item = &'a Self>,
290 Self: Sized + 'a,
291 {
292 Self(values.into_iter().fold(true, |base, new| base & new.0))
294 }
295
296 fn incorporate(&mut self, other: Self, field_name: &'static str) {
297 #[allow(clippy::bool_comparison)]
301 if other.0 < self.0 {
302 tracing::error!(
303 "boolean gauge for field {field_name} erroneously regressed from true to false",
304 );
305 return;
306 }
307 self.0 = other.0;
308 }
309}
310
311impl From<bool> for Boolean {
312 fn from(f: bool) -> Self {
313 Boolean(f)
314 }
315}
316
317#[derive(Debug, Serialize, Deserialize, Default)]
319pub struct Total {
320 total: Option<u64>,
322 #[serde(skip)]
325 regressions:
326 Option<mz_ore::metrics::DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>>>,
327}
328
329impl From<Option<u64>> for Total {
330 fn from(f: Option<u64>) -> Self {
331 Total {
332 total: f,
333 regressions: None,
334 }
335 }
336}
337
338impl Clone for Total {
339 fn clone(&self) -> Self {
340 Self {
341 total: self.total,
342 regressions: None,
343 }
344 }
345}
346
347impl PartialEq for Total {
348 fn eq(&self, other: &Self) -> bool {
349 self.total == other.total
350 }
351}
352
353impl Total {
354 fn pack(&self) -> u64 {
356 self.total.unwrap_or_default()
357 }
358}
359
360impl StorageMetric for Total {
361 fn summarize<'a, I>(values: I) -> Self
362 where
363 I: IntoIterator<Item = &'a Self>,
364 Self: Sized + 'a,
365 {
366 let mut any_none = false;
369
370 let inner = values
371 .into_iter()
372 .filter_map(|i| {
373 any_none |= i.total.is_none();
374 i.total.as_ref()
375 })
376 .sum();
377
378 Self {
381 total: (!any_none).then_some(inner),
382 regressions: None,
383 }
384 }
385
386 fn incorporate(&mut self, other: Self, field_name: &'static str) {
387 match (&mut self.total, other.total) {
388 (_, None) => {}
389 (None, Some(other)) => self.total = Some(other),
390 (Some(this), Some(other)) => {
391 if other < *this {
392 if let Some(metric) = &self.regressions {
393 metric.inc()
394 } else {
395 tracing::error!(
396 "total gauge {field_name} erroneously regressed from {} to {}",
397 this,
398 other
399 );
400 }
401 return;
402 }
403 *this = other
404 }
405 }
406 }
407}
408
409#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
411pub struct Gauge<T>(T);
412
413impl<T> Gauge<T> {
414 pub fn gauge<F>(f: F) -> Self
416 where
417 T: From<F>,
418 {
419 Gauge(f.into())
420 }
421}
422
423impl<T: StorageMetric> StorageMetric for Gauge<T> {
424 fn summarize<'a, I>(values: I) -> Self
425 where
426 I: IntoIterator<Item = &'a Self>,
427 Self: Sized + 'a,
428 {
429 Gauge(T::summarize(values.into_iter().map(|i| &i.0)))
430 }
431
432 fn incorporate(&mut self, other: Self, field_name: &'static str) {
433 self.0.incorporate(other.0, field_name)
434 }
435}
436
437pub trait PackableStats {
440 fn pack(&self, packer: mz_repr::RowPacker<'_>);
442 fn unpack(row: Row, metrics: &crate::metrics::StorageControllerMetrics) -> (GlobalId, Self);
444}
445
446#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
450pub struct SourceStatisticsUpdate {
451 pub id: GlobalId,
452
453 pub messages_received: Counter,
454 pub bytes_received: Counter,
455 pub updates_staged: Counter,
456 pub updates_committed: Counter,
457
458 pub records_indexed: Gauge<ResettingTotal>,
459 pub bytes_indexed: Gauge<ResettingTotal>,
460 pub rehydration_latency_ms: Gauge<ResettingLatency>,
461 pub snapshot_records_known: Gauge<ResettingNullableTotal>,
462 pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
463
464 pub snapshot_committed: Gauge<Boolean>,
465 pub offset_known: Gauge<Total>,
484 pub offset_committed: Gauge<Total>,
485}
486
487impl SourceStatisticsUpdate {
488 pub fn new(id: GlobalId) -> Self {
489 Self {
490 id,
491 messages_received: Default::default(),
492 bytes_received: Default::default(),
493 updates_staged: Default::default(),
494 updates_committed: Default::default(),
495 records_indexed: Default::default(),
496 bytes_indexed: Default::default(),
497 rehydration_latency_ms: Default::default(),
498 snapshot_records_known: Default::default(),
499 snapshot_records_staged: Default::default(),
500 snapshot_committed: Default::default(),
501 offset_known: Default::default(),
502 offset_committed: Default::default(),
503 }
504 }
505
506 pub fn summarize<'a, I, F>(values: F) -> Self
507 where
508 I: IntoIterator<Item = &'a Self>,
509 F: Fn() -> I,
510 Self: 'a,
511 {
512 SourceStatisticsUpdate {
513 id: values().into_iter().next().unwrap().id,
514 messages_received: Counter::summarize(
515 values().into_iter().map(|s| &s.messages_received),
516 ),
517 bytes_received: Counter::summarize(values().into_iter().map(|s| &s.bytes_received)),
518 updates_staged: Counter::summarize(values().into_iter().map(|s| &s.updates_staged)),
519 updates_committed: Counter::summarize(
520 values().into_iter().map(|s| &s.updates_committed),
521 ),
522 records_indexed: Gauge::summarize(values().into_iter().map(|s| &s.records_indexed)),
523 bytes_indexed: Gauge::summarize(values().into_iter().map(|s| &s.bytes_indexed)),
524 rehydration_latency_ms: Gauge::summarize(
525 values().into_iter().map(|s| &s.rehydration_latency_ms),
526 ),
527 snapshot_records_known: Gauge::summarize(
528 values().into_iter().map(|s| &s.snapshot_records_known),
529 ),
530 snapshot_records_staged: Gauge::summarize(
531 values().into_iter().map(|s| &s.snapshot_records_staged),
532 ),
533 snapshot_committed: Gauge::summarize(
534 values().into_iter().map(|s| &s.snapshot_committed),
535 ),
536 offset_known: Gauge::summarize(values().into_iter().map(|s| &s.offset_known)),
537 offset_committed: Gauge::summarize(values().into_iter().map(|s| &s.offset_committed)),
538 }
539 }
540
541 pub fn reset_counters(&mut self) {
543 self.messages_received.0 = 0;
544 self.bytes_received.0 = 0;
545 self.updates_staged.0 = 0;
546 self.updates_committed.0 = 0;
547 }
548
549 pub fn reset_gauges(&mut self) {
551 self.records_indexed.0.reset();
552 self.bytes_indexed.0.reset();
553 self.rehydration_latency_ms.0.reset();
554 self.snapshot_records_known.0.reset();
555 self.snapshot_records_staged.0.reset();
556 }
557
558 pub fn incorporate(&mut self, other: SourceStatisticsUpdate) {
559 let SourceStatisticsUpdate {
560 messages_received,
561 bytes_received,
562 updates_staged,
563 updates_committed,
564 records_indexed,
565 bytes_indexed,
566 rehydration_latency_ms,
567 snapshot_records_known,
568 snapshot_records_staged,
569 snapshot_committed,
570 offset_known,
571 offset_committed,
572 ..
573 } = self;
574
575 messages_received.incorporate(other.messages_received, "messages_received");
576 bytes_received.incorporate(other.bytes_received, "bytes_received");
577 updates_staged.incorporate(other.updates_staged, "updates_staged");
578 updates_committed.incorporate(other.updates_committed, "updates_committed");
579 records_indexed.incorporate(other.records_indexed, "records_indexed");
580 bytes_indexed.incorporate(other.bytes_indexed, "bytes_indexed");
581 rehydration_latency_ms.incorporate(other.rehydration_latency_ms, "rehydration_latency_ms");
582 snapshot_records_known.incorporate(other.snapshot_records_known, "snapshot_records_known");
583 snapshot_records_staged
584 .incorporate(other.snapshot_records_staged, "snapshot_records_staged");
585 snapshot_committed.incorporate(other.snapshot_committed, "snapshot_committed");
586 offset_known.incorporate(other.offset_known, "offset_known");
587 offset_committed.incorporate(other.offset_committed, "offset_committed");
588 }
589
590 pub fn incorporate_counters(&mut self, other: SourceStatisticsUpdate) {
592 let SourceStatisticsUpdate {
593 messages_received,
594 bytes_received,
595 updates_staged,
596 updates_committed,
597 ..
598 } = self;
599
600 messages_received.incorporate(other.messages_received, "messages_received");
601 bytes_received.incorporate(other.bytes_received, "bytes_received");
602 updates_staged.incorporate(other.updates_staged, "updates_staged");
603 updates_committed.incorporate(other.updates_committed, "updates_committed");
604 }
605
606 pub fn with_metrics(mut self, metrics: &crate::metrics::StorageControllerMetrics) -> Self {
608 self.offset_known.0.regressions = Some(metrics.regressed_offset_known(self.id));
609 self
610 }
611}
612
613impl PackableStats for SourceStatisticsUpdate {
614 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
615 use mz_repr::Datum;
616 packer.push(Datum::from(self.id.to_string().as_str()));
618 packer.push(Datum::from(self.messages_received.0));
620 packer.push(Datum::from(self.bytes_received.0));
621 packer.push(Datum::from(self.updates_staged.0));
622 packer.push(Datum::from(self.updates_committed.0));
623 packer.push(Datum::from(self.records_indexed.0.0));
625 packer.push(Datum::from(self.bytes_indexed.0.0));
626 let rehydration_latency = self
627 .rehydration_latency_ms
628 .0
629 .0
630 .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
631 packer.push(Datum::from(rehydration_latency));
632 packer.push(Datum::from(self.snapshot_records_known.0.0));
633 packer.push(Datum::from(self.snapshot_records_staged.0.0));
634 packer.push(Datum::from(self.snapshot_committed.0.0));
636 packer.push(Datum::from(self.offset_known.0.pack()));
637 packer.push(Datum::from(self.offset_committed.0.pack()));
638 }
639
640 fn unpack(row: Row, metrics: &crate::metrics::StorageControllerMetrics) -> (GlobalId, Self) {
641 let mut iter = row.iter();
642 let mut s = Self {
643 id: iter.next().unwrap().unwrap_str().parse().unwrap(),
644
645 messages_received: iter.next().unwrap().unwrap_uint64().into(),
646 bytes_received: iter.next().unwrap().unwrap_uint64().into(),
647 updates_staged: iter.next().unwrap().unwrap_uint64().into(),
648 updates_committed: iter.next().unwrap().unwrap_uint64().into(),
649
650 records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
651 bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
652 rehydration_latency_ms: Gauge::gauge(
653 <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
654 .unwrap()
655 .map(|int| int.micros / 1000),
656 ),
657 snapshot_records_known: Gauge::gauge(
658 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
659 ),
660 snapshot_records_staged: Gauge::gauge(
661 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
662 ),
663
664 snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
665 offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
666 offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
667 };
668
669 s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
670 (s.id, s)
671 }
672}
673
674impl RustType<ProtoSourceStatisticsUpdate> for SourceStatisticsUpdate {
675 fn into_proto(&self) -> ProtoSourceStatisticsUpdate {
676 ProtoSourceStatisticsUpdate {
677 id: Some(self.id.into_proto()),
678
679 messages_received: self.messages_received.0,
680 bytes_received: self.bytes_received.0,
681 updates_staged: self.updates_staged.0,
682 updates_committed: self.updates_committed.0,
683
684 records_indexed: self.records_indexed.0.0,
685 bytes_indexed: self.bytes_indexed.0.0,
686 rehydration_latency_ms: self.rehydration_latency_ms.0.0,
687 snapshot_records_known: self.snapshot_records_known.0.0,
688 snapshot_records_staged: self.snapshot_records_staged.0.0,
689
690 snapshot_committed: self.snapshot_committed.0.0,
691 offset_known: self.offset_known.0.total,
692 offset_committed: self.offset_committed.0.total,
693 }
694 }
695
696 fn from_proto(proto: ProtoSourceStatisticsUpdate) -> Result<Self, TryFromProtoError> {
697 Ok(SourceStatisticsUpdate {
698 id: proto
699 .id
700 .into_rust_if_some("ProtoSourceStatisticsUpdate::id")?,
701
702 messages_received: Counter(proto.messages_received),
703 bytes_received: Counter(proto.bytes_received),
704 updates_staged: Counter(proto.updates_staged),
705 updates_committed: Counter(proto.updates_committed),
706
707 records_indexed: Gauge::gauge(proto.records_indexed),
708 bytes_indexed: Gauge::gauge(proto.bytes_indexed),
709 rehydration_latency_ms: Gauge::gauge(proto.rehydration_latency_ms),
710 snapshot_records_known: Gauge::gauge(proto.snapshot_records_known),
711 snapshot_records_staged: Gauge::gauge(proto.snapshot_records_staged),
712
713 snapshot_committed: Gauge::gauge(proto.snapshot_committed),
714 offset_known: Gauge::gauge(proto.offset_known),
715 offset_committed: Gauge::gauge(proto.offset_committed),
716 })
717 }
718}
719
720#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
724pub struct SinkStatisticsUpdate {
725 pub id: GlobalId,
726
727 pub messages_staged: Counter,
728 pub messages_committed: Counter,
729 pub bytes_staged: Counter,
730 pub bytes_committed: Counter,
731}
732
733impl SinkStatisticsUpdate {
734 pub fn new(id: GlobalId) -> Self {
735 Self {
736 id,
737 messages_staged: Default::default(),
738 messages_committed: Default::default(),
739 bytes_staged: Default::default(),
740 bytes_committed: Default::default(),
741 }
742 }
743
744 pub fn incorporate(&mut self, other: SinkStatisticsUpdate) {
745 let SinkStatisticsUpdate {
746 messages_staged,
747 messages_committed,
748 bytes_staged,
749 bytes_committed,
750 ..
751 } = self;
752
753 messages_staged.incorporate(other.messages_staged, "messages_staged");
754 messages_committed.incorporate(other.messages_committed, "messages_committed");
755 bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
756 bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
757 }
758
759 pub fn incorporate_counters(&mut self, other: SinkStatisticsUpdate) {
761 let SinkStatisticsUpdate {
762 messages_staged,
763 messages_committed,
764 bytes_staged,
765 bytes_committed,
766 ..
767 } = self;
768
769 messages_staged.incorporate(other.messages_staged, "messages_staged");
770 messages_committed.incorporate(other.messages_committed, "messages_committed");
771 bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
772 bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
773 }
774
775 pub fn summarize<'a, I, F>(values: F) -> Self
776 where
777 I: IntoIterator<Item = &'a Self>,
778 F: Fn() -> I,
779 Self: 'a,
780 {
781 SinkStatisticsUpdate {
782 id: values().into_iter().next().unwrap().id,
783 messages_staged: Counter::summarize(values().into_iter().map(|s| &s.messages_staged)),
784 messages_committed: Counter::summarize(
785 values().into_iter().map(|s| &s.messages_committed),
786 ),
787 bytes_staged: Counter::summarize(values().into_iter().map(|s| &s.bytes_staged)),
788 bytes_committed: Counter::summarize(values().into_iter().map(|s| &s.bytes_committed)),
789 }
790 }
791
792 pub fn reset_counters(&mut self) {
794 self.messages_staged.0 = 0;
795 self.messages_committed.0 = 0;
796 self.bytes_staged.0 = 0;
797 self.bytes_committed.0 = 0;
798 }
799
800 pub fn reset_gauges(&self) {}
802}
803
804impl PackableStats for SinkStatisticsUpdate {
805 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
806 use mz_repr::Datum;
807 packer.push(Datum::from(self.id.to_string().as_str()));
808 packer.push(Datum::from(self.messages_staged.0));
809 packer.push(Datum::from(self.messages_committed.0));
810 packer.push(Datum::from(self.bytes_staged.0));
811 packer.push(Datum::from(self.bytes_committed.0));
812 }
813
814 fn unpack(row: Row, _metrics: &crate::metrics::StorageControllerMetrics) -> (GlobalId, Self) {
815 let mut iter = row.iter();
816 let s = Self {
817 id: iter.next().unwrap().unwrap_str().parse().unwrap(),
819 messages_staged: iter.next().unwrap().unwrap_uint64().into(),
821 messages_committed: iter.next().unwrap().unwrap_uint64().into(),
822 bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
823 bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
824 };
825
826 (s.id, s)
827 }
828}
829
830impl RustType<ProtoSinkStatisticsUpdate> for SinkStatisticsUpdate {
831 fn into_proto(&self) -> ProtoSinkStatisticsUpdate {
832 ProtoSinkStatisticsUpdate {
833 id: Some(self.id.into_proto()),
834
835 messages_staged: self.messages_staged.0,
836 messages_committed: self.messages_committed.0,
837 bytes_staged: self.bytes_staged.0,
838 bytes_committed: self.bytes_committed.0,
839 }
840 }
841
842 fn from_proto(proto: ProtoSinkStatisticsUpdate) -> Result<Self, TryFromProtoError> {
843 Ok(SinkStatisticsUpdate {
844 id: proto
845 .id
846 .into_rust_if_some("ProtoSinkStatisticsUpdate::id")?,
847
848 messages_staged: Counter(proto.messages_staged),
849 messages_committed: Counter(proto.messages_committed),
850 bytes_staged: Counter(proto.bytes_staged),
851 bytes_committed: Counter(proto.bytes_committed),
852 })
853 }
854}
855
856#[derive(Default, Debug)]
858pub struct WebhookStatistics {
859 pub messages_received: AtomicU64,
860 pub bytes_received: AtomicU64,
861 pub updates_staged: AtomicU64,
862 pub updates_committed: AtomicU64,
863}
864
865impl WebhookStatistics {
866 pub fn drain_into_update(&self, id: GlobalId) -> SourceStatisticsUpdate {
869 SourceStatisticsUpdate {
870 id,
871 messages_received: self.messages_received.swap(0, Ordering::Relaxed).into(),
872 bytes_received: self.bytes_received.swap(0, Ordering::Relaxed).into(),
873 updates_staged: self.updates_staged.swap(0, Ordering::Relaxed).into(),
874 updates_committed: self.updates_committed.swap(0, Ordering::Relaxed).into(),
875 records_indexed: Gauge::gauge(0),
876 bytes_indexed: Gauge::gauge(0),
877 rehydration_latency_ms: Gauge::gauge(None),
878 snapshot_records_known: Gauge::gauge(None),
879 snapshot_records_staged: Gauge::gauge(None),
880 snapshot_committed: Gauge::gauge(true),
881 offset_known: Gauge::gauge(None::<u64>),
882 offset_committed: Gauge::gauge(None::<u64>),
883 }
884 }
885}