1use std::sync::LazyLock;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::Instant;
22
23use mz_controller_types::ReplicaId;
24use serde::{Deserialize, Serialize};
25
26use mz_repr::{GlobalId, RelationDesc, Row, SqlScalarType};
27
28pub static MZ_SOURCE_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
29 RelationDesc::builder()
30 .with_column("id", SqlScalarType::String.nullable(false))
32 .with_column("replica_id", SqlScalarType::String.nullable(true))
34 .with_column("messages_received", SqlScalarType::UInt64.nullable(false))
40 .with_column("bytes_received", SqlScalarType::UInt64.nullable(false))
43 .with_column("updates_staged", SqlScalarType::UInt64.nullable(false))
46 .with_column("updates_committed", SqlScalarType::UInt64.nullable(false))
49 .with_column("records_indexed", SqlScalarType::UInt64.nullable(false))
55 .with_column("bytes_indexed", SqlScalarType::UInt64.nullable(false))
58 .with_column(
62 "rehydration_latency",
63 SqlScalarType::Interval.nullable(true),
64 )
65 .with_column(
73 "snapshot_records_known",
74 SqlScalarType::UInt64.nullable(true),
75 )
76 .with_column(
83 "snapshot_records_staged",
84 SqlScalarType::UInt64.nullable(true),
85 )
86 .with_column("snapshot_committed", SqlScalarType::Bool.nullable(false))
91 .with_column("offset_known", SqlScalarType::UInt64.nullable(true))
100 .with_column("offset_committed", SqlScalarType::UInt64.nullable(true))
103 .finish()
104});
105
106pub static MZ_SINK_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
107 RelationDesc::builder()
108 .with_column("id", SqlScalarType::String.nullable(false))
110 .with_column("replica_id", SqlScalarType::String.nullable(true))
112 .with_column("messages_staged", SqlScalarType::UInt64.nullable(false))
118 .with_column("messages_committed", SqlScalarType::UInt64.nullable(false))
121 .with_column("bytes_staged", SqlScalarType::UInt64.nullable(false))
124 .with_column("bytes_committed", SqlScalarType::UInt64.nullable(false))
127 .finish()
128});
129
130pub trait StorageMetric {
135 fn summarize<'a, I>(values: I) -> Self
138 where
139 I: IntoIterator<Item = &'a Self>,
140 Self: Sized + 'a;
141
142 fn incorporate(&mut self, other: Self, field_name: &'static str);
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
148pub struct Counter(u64);
149
150impl StorageMetric for Counter {
151 fn summarize<'a, I>(values: I) -> Self
152 where
153 I: IntoIterator<Item = &'a Self>,
154 Self: Sized + 'a,
155 {
156 Self(values.into_iter().map(|c| c.0).sum())
158 }
159
160 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
161 self.0 += other.0
163 }
164}
165
166impl From<u64> for Counter {
167 fn from(f: u64) -> Self {
168 Counter(f)
169 }
170}
171
172#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
174pub struct ResettingLatency(Option<i64>);
175
176impl From<Option<i64>> for ResettingLatency {
177 fn from(f: Option<i64>) -> Self {
178 ResettingLatency(f)
179 }
180}
181
182impl StorageMetric for ResettingLatency {
183 fn summarize<'a, I>(values: I) -> Self
184 where
185 I: IntoIterator<Item = &'a Self>,
186 Self: Sized + 'a,
187 {
188 let mut max = 0;
189 for value in values {
190 match value.0 {
191 None => return Self(None),
193 Some(value) => max = std::cmp::max(max, value),
195 }
196 }
197
198 Self(Some(max))
199 }
200
201 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
202 self.0 = other.0;
204 }
205}
206
207impl ResettingLatency {
208 fn reset(&mut self) {
209 self.0 = None;
210 }
211}
212
213#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
215pub struct ResettingNullableTotal(Option<u64>);
216
217impl StorageMetric for ResettingNullableTotal {
218 fn summarize<'a, I>(values: I) -> Self
219 where
220 I: IntoIterator<Item = &'a Self>,
221 Self: Sized + 'a,
222 {
223 let mut sum = 0;
224 for value in values {
225 match value.0 {
226 None => return Self(None),
228 Some(value) => sum += value,
230 }
231 }
232
233 Self(Some(sum))
234 }
235
236 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
237 match (&mut self.0, other.0) {
238 (None, other) => {
239 self.0 = other;
240 }
241 (Some(this), Some(other)) => *this = other,
243 (Some(_), None) => {
244 }
246 }
247 }
248}
249
250impl From<Option<u64>> for ResettingNullableTotal {
251 fn from(f: Option<u64>) -> Self {
252 ResettingNullableTotal(f)
253 }
254}
255
256impl ResettingNullableTotal {
257 fn reset(&mut self) {
258 self.0 = None;
259 }
260}
261
262#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
264pub struct ResettingTotal(u64);
265
266impl StorageMetric for ResettingTotal {
267 fn summarize<'a, I>(values: I) -> Self
268 where
269 I: IntoIterator<Item = &'a Self>,
270 Self: Sized + 'a,
271 {
272 Self(values.into_iter().map(|c| c.0).sum())
274 }
275
276 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
277 self.0 = other.0;
279 }
280}
281
282impl From<u64> for ResettingTotal {
283 fn from(f: u64) -> Self {
284 ResettingTotal(f)
285 }
286}
287
288impl ResettingTotal {
289 fn reset(&mut self) {
290 self.0 = 0;
291 }
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
296pub struct Boolean(bool);
297
298impl StorageMetric for Boolean {
299 fn summarize<'a, I>(values: I) -> Self
300 where
301 I: IntoIterator<Item = &'a Self>,
302 Self: Sized + 'a,
303 {
304 Self(values.into_iter().fold(true, |base, new| base & new.0))
306 }
307
308 fn incorporate(&mut self, other: Self, field_name: &'static str) {
309 #[allow(clippy::bool_comparison)]
313 if other.0 < self.0 {
314 tracing::error!(
315 "boolean gauge for field {field_name} erroneously regressed from true to false",
316 );
317 return;
318 }
319 self.0 = other.0;
320 }
321}
322
323impl From<bool> for Boolean {
324 fn from(f: bool) -> Self {
325 Boolean(f)
326 }
327}
328
329#[derive(Debug, Serialize, Deserialize, Default)]
331pub struct Total {
332 total: Option<u64>,
334 #[serde(skip)]
337 regressions:
338 Option<mz_ore::metrics::DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>>>,
339}
340
341impl From<Option<u64>> for Total {
342 fn from(f: Option<u64>) -> Self {
343 Total {
344 total: f,
345 regressions: None,
346 }
347 }
348}
349
350impl Clone for Total {
351 fn clone(&self) -> Self {
352 Self {
353 total: self.total,
354 regressions: None,
355 }
356 }
357}
358
359impl PartialEq for Total {
360 fn eq(&self, other: &Self) -> bool {
361 self.total == other.total
362 }
363}
364
365impl Total {
366 fn pack(&self) -> u64 {
368 self.total.unwrap_or_default()
369 }
370}
371
372impl StorageMetric for Total {
373 fn summarize<'a, I>(values: I) -> Self
374 where
375 I: IntoIterator<Item = &'a Self>,
376 Self: Sized + 'a,
377 {
378 let mut any_none = false;
381
382 let inner = values
383 .into_iter()
384 .filter_map(|i| {
385 any_none |= i.total.is_none();
386 i.total.as_ref()
387 })
388 .sum();
389
390 Self {
393 total: (!any_none).then_some(inner),
394 regressions: None,
395 }
396 }
397
398 fn incorporate(&mut self, other: Self, field_name: &'static str) {
399 match (&mut self.total, other.total) {
400 (_, None) => {}
401 (None, Some(other)) => self.total = Some(other),
402 (Some(this), Some(other)) => {
403 if other < *this {
404 if let Some(metric) = &self.regressions {
405 metric.inc()
406 } else {
407 tracing::error!(
408 "total gauge {field_name} erroneously regressed from {} to {}",
409 this,
410 other
411 );
412 }
413 return;
414 }
415 *this = other
416 }
417 }
418 }
419}
420
421#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
423pub struct Gauge<T>(T);
424
425impl<T> Gauge<T> {
426 pub fn gauge<F>(f: F) -> Self
428 where
429 T: From<F>,
430 {
431 Gauge(f.into())
432 }
433}
434
435impl<T: StorageMetric> StorageMetric for Gauge<T> {
436 fn summarize<'a, I>(values: I) -> Self
437 where
438 I: IntoIterator<Item = &'a Self>,
439 Self: Sized + 'a,
440 {
441 Gauge(T::summarize(values.into_iter().map(|i| &i.0)))
442 }
443
444 fn incorporate(&mut self, other: Self, field_name: &'static str) {
445 self.0.incorporate(other.0, field_name)
446 }
447}
448
449pub trait PackableStats {
452 fn pack(&self, packer: mz_repr::RowPacker<'_>);
454 fn unpack(
456 row: Row,
457 metrics: &crate::metrics::StorageControllerMetrics,
458 ) -> (GlobalId, Option<ReplicaId>, Self);
459}
460
461pub trait ExpirableStats {
463 fn last_updated(&self) -> Instant;
466}
467
468pub trait ZeroInitializedStats {
478 fn needs_zero_initialization(&self) -> bool;
480
481 fn mark_zero_initialized(&mut self);
483
484 fn zero_stat(&self) -> Self;
487}
488
489#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
493pub struct SourceStatisticsUpdate {
494 pub id: GlobalId,
495
496 pub messages_received: Counter,
497 pub bytes_received: Counter,
498 pub updates_staged: Counter,
499 pub updates_committed: Counter,
500
501 pub records_indexed: Gauge<ResettingTotal>,
502 pub bytes_indexed: Gauge<ResettingTotal>,
503 pub rehydration_latency_ms: Gauge<ResettingLatency>,
504 pub snapshot_records_known: Gauge<ResettingNullableTotal>,
505 pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
506
507 pub snapshot_committed: Gauge<Boolean>,
508 pub offset_known: Gauge<Total>,
527 pub offset_committed: Gauge<Total>,
528}
529
530impl SourceStatisticsUpdate {
531 pub fn new(id: GlobalId) -> Self {
532 Self {
533 id,
534 messages_received: Default::default(),
535 bytes_received: Default::default(),
536 updates_staged: Default::default(),
537 updates_committed: Default::default(),
538 records_indexed: Default::default(),
539 bytes_indexed: Default::default(),
540 rehydration_latency_ms: Default::default(),
541 snapshot_records_known: Default::default(),
542 snapshot_records_staged: Default::default(),
543 snapshot_committed: Default::default(),
544 offset_known: Default::default(),
545 offset_committed: Default::default(),
546 }
547 }
548
549 pub fn summarize<'a, I, F>(values: F) -> Self
550 where
551 I: IntoIterator<Item = &'a Self>,
552 F: Fn() -> I,
553 Self: 'a,
554 {
555 SourceStatisticsUpdate {
556 id: values().into_iter().next().unwrap().id,
557 messages_received: Counter::summarize(
558 values().into_iter().map(|s| &s.messages_received),
559 ),
560 bytes_received: Counter::summarize(values().into_iter().map(|s| &s.bytes_received)),
561 updates_staged: Counter::summarize(values().into_iter().map(|s| &s.updates_staged)),
562 updates_committed: Counter::summarize(
563 values().into_iter().map(|s| &s.updates_committed),
564 ),
565 records_indexed: Gauge::summarize(values().into_iter().map(|s| &s.records_indexed)),
566 bytes_indexed: Gauge::summarize(values().into_iter().map(|s| &s.bytes_indexed)),
567 rehydration_latency_ms: Gauge::summarize(
568 values().into_iter().map(|s| &s.rehydration_latency_ms),
569 ),
570 snapshot_records_known: Gauge::summarize(
571 values().into_iter().map(|s| &s.snapshot_records_known),
572 ),
573 snapshot_records_staged: Gauge::summarize(
574 values().into_iter().map(|s| &s.snapshot_records_staged),
575 ),
576 snapshot_committed: Gauge::summarize(
577 values().into_iter().map(|s| &s.snapshot_committed),
578 ),
579 offset_known: Gauge::summarize(values().into_iter().map(|s| &s.offset_known)),
580 offset_committed: Gauge::summarize(values().into_iter().map(|s| &s.offset_committed)),
581 }
582 }
583
584 pub fn reset_counters(&mut self) {
586 self.messages_received.0 = 0;
587 self.bytes_received.0 = 0;
588 self.updates_staged.0 = 0;
589 self.updates_committed.0 = 0;
590 }
591
592 pub fn reset_gauges(&mut self) {
594 self.records_indexed.0.reset();
595 self.bytes_indexed.0.reset();
596 self.rehydration_latency_ms.0.reset();
597 self.snapshot_records_known.0.reset();
598 self.snapshot_records_staged.0.reset();
599 }
600
601 pub fn incorporate(&mut self, other: SourceStatisticsUpdate) {
602 let SourceStatisticsUpdate {
603 messages_received,
604 bytes_received,
605 updates_staged,
606 updates_committed,
607 records_indexed,
608 bytes_indexed,
609 rehydration_latency_ms,
610 snapshot_records_known,
611 snapshot_records_staged,
612 snapshot_committed,
613 offset_known,
614 offset_committed,
615 ..
616 } = self;
617
618 messages_received.incorporate(other.messages_received, "messages_received");
619 bytes_received.incorporate(other.bytes_received, "bytes_received");
620 updates_staged.incorporate(other.updates_staged, "updates_staged");
621 updates_committed.incorporate(other.updates_committed, "updates_committed");
622 records_indexed.incorporate(other.records_indexed, "records_indexed");
623 bytes_indexed.incorporate(other.bytes_indexed, "bytes_indexed");
624 rehydration_latency_ms.incorporate(other.rehydration_latency_ms, "rehydration_latency_ms");
625 snapshot_records_known.incorporate(other.snapshot_records_known, "snapshot_records_known");
626 snapshot_records_staged
627 .incorporate(other.snapshot_records_staged, "snapshot_records_staged");
628 snapshot_committed.incorporate(other.snapshot_committed, "snapshot_committed");
629 offset_known.incorporate(other.offset_known, "offset_known");
630 offset_committed.incorporate(other.offset_committed, "offset_committed");
631 }
632
633 pub fn incorporate_counters(&mut self, other: SourceStatisticsUpdate) {
635 let SourceStatisticsUpdate {
636 messages_received,
637 bytes_received,
638 updates_staged,
639 updates_committed,
640 ..
641 } = self;
642
643 messages_received.incorporate(other.messages_received, "messages_received");
644 bytes_received.incorporate(other.bytes_received, "bytes_received");
645 updates_staged.incorporate(other.updates_staged, "updates_staged");
646 updates_committed.incorporate(other.updates_committed, "updates_committed");
647 }
648
649 pub fn with_metrics(mut self, metrics: &crate::metrics::StorageControllerMetrics) -> Self {
651 self.offset_known.0.regressions = Some(metrics.regressed_offset_known(self.id));
652 self
653 }
654}
655
656impl PackableStats for SourceStatisticsUpdate {
657 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
658 use mz_repr::Datum;
659 packer.push(Datum::from(self.id.to_string().as_str()));
661 packer.push(Datum::from(self.messages_received.0));
663 packer.push(Datum::from(self.bytes_received.0));
664 packer.push(Datum::from(self.updates_staged.0));
665 packer.push(Datum::from(self.updates_committed.0));
666 packer.push(Datum::from(self.records_indexed.0.0));
668 packer.push(Datum::from(self.bytes_indexed.0.0));
669 let rehydration_latency = self
670 .rehydration_latency_ms
671 .0
672 .0
673 .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
674 packer.push(Datum::from(rehydration_latency));
675 packer.push(Datum::from(self.snapshot_records_known.0.0));
676 packer.push(Datum::from(self.snapshot_records_staged.0.0));
677 packer.push(Datum::from(self.snapshot_committed.0.0));
679 packer.push(Datum::from(self.offset_known.0.pack()));
680 packer.push(Datum::from(self.offset_committed.0.pack()));
681 }
682
683 fn unpack(
684 row: Row,
685 metrics: &crate::metrics::StorageControllerMetrics,
686 ) -> (GlobalId, Option<ReplicaId>, Self) {
687 let mut iter = row.iter();
688 let mut s = Self {
689 id: iter.next().unwrap().unwrap_str().parse().unwrap(),
690
691 messages_received: iter.next().unwrap().unwrap_uint64().into(),
692 bytes_received: iter.next().unwrap().unwrap_uint64().into(),
693 updates_staged: iter.next().unwrap().unwrap_uint64().into(),
694 updates_committed: iter.next().unwrap().unwrap_uint64().into(),
695
696 records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
697 bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
698 rehydration_latency_ms: Gauge::gauge(
699 <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
700 .unwrap()
701 .map(|int| int.micros / 1000),
702 ),
703 snapshot_records_known: Gauge::gauge(
704 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
705 ),
706 snapshot_records_staged: Gauge::gauge(
707 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
708 ),
709
710 snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
711 offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
712 offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
713 };
714
715 s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
716 (s.id, None, s)
717 }
718}
719
720#[derive(Clone, Debug, PartialEq)]
726pub struct ControllerSourceStatistics {
727 pub id: GlobalId,
728
729 pub replica_id: Option<ReplicaId>,
733
734 pub messages_received: Counter,
735 pub bytes_received: Counter,
736 pub updates_staged: Counter,
737 pub updates_committed: Counter,
738
739 pub records_indexed: Gauge<ResettingTotal>,
740 pub bytes_indexed: Gauge<ResettingTotal>,
741 pub rehydration_latency_ms: Gauge<ResettingLatency>,
742 pub snapshot_records_known: Gauge<ResettingNullableTotal>,
743 pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
744
745 pub snapshot_committed: Gauge<Boolean>,
746 pub offset_known: Gauge<Total>,
747 pub offset_committed: Gauge<Total>,
748
749 pub last_updated: Instant,
758
759 needs_zero_initialization: bool,
761}
762
763impl ControllerSourceStatistics {
764 pub fn new(id: GlobalId, replica_id: Option<ReplicaId>) -> Self {
765 Self {
766 id,
767 replica_id,
768 messages_received: Default::default(),
769 bytes_received: Default::default(),
770 updates_staged: Default::default(),
771 updates_committed: Default::default(),
772 records_indexed: Default::default(),
773 bytes_indexed: Default::default(),
774 rehydration_latency_ms: Default::default(),
775 snapshot_records_known: Default::default(),
776 snapshot_records_staged: Default::default(),
777 snapshot_committed: Default::default(),
778 offset_known: Default::default(),
779 offset_committed: Default::default(),
780 last_updated: Instant::now(),
781 needs_zero_initialization: true,
782 }
783 }
784
785 pub fn incorporate(&mut self, update: SourceStatisticsUpdate) {
788 let ControllerSourceStatistics {
789 id: _,
790 replica_id: _,
791 messages_received,
792 bytes_received,
793 updates_staged,
794 updates_committed,
795 records_indexed,
796 bytes_indexed,
797 rehydration_latency_ms,
798 snapshot_records_known,
799 snapshot_records_staged,
800 snapshot_committed,
801 offset_known,
802 offset_committed,
803 last_updated,
804 needs_zero_initialization: _,
805 } = self;
806
807 messages_received.incorporate(update.messages_received, "messages_received");
808 bytes_received.incorporate(update.bytes_received, "bytes_received");
809 updates_staged.incorporate(update.updates_staged, "updates_staged");
810 updates_committed.incorporate(update.updates_committed, "updates_committed");
811 records_indexed.incorporate(update.records_indexed, "records_indexed");
812 bytes_indexed.incorporate(update.bytes_indexed, "bytes_indexed");
813 rehydration_latency_ms.incorporate(update.rehydration_latency_ms, "rehydration_latency_ms");
814 snapshot_records_known.incorporate(update.snapshot_records_known, "snapshot_records_known");
815 snapshot_records_staged
816 .incorporate(update.snapshot_records_staged, "snapshot_records_staged");
817 snapshot_committed.incorporate(update.snapshot_committed, "snapshot_committed");
818 offset_known.incorporate(update.offset_known, "offset_known");
819 offset_committed.incorporate(update.offset_committed, "offset_committed");
820
821 *last_updated = Instant::now();
822 }
823}
824
825impl PackableStats for ControllerSourceStatistics {
826 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
827 use mz_repr::Datum;
828 packer.push(Datum::from(self.id.to_string().as_str()));
830 if let Some(replica_id) = self.replica_id {
832 packer.push(Datum::from(replica_id.to_string().as_str()));
833 } else {
834 packer.push(Datum::Null);
835 }
836 packer.push(Datum::from(self.messages_received.0));
838 packer.push(Datum::from(self.bytes_received.0));
839 packer.push(Datum::from(self.updates_staged.0));
840 packer.push(Datum::from(self.updates_committed.0));
841 packer.push(Datum::from(self.records_indexed.0.0));
843 packer.push(Datum::from(self.bytes_indexed.0.0));
844 let rehydration_latency = self
845 .rehydration_latency_ms
846 .0
847 .0
848 .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
849 packer.push(Datum::from(rehydration_latency));
850 packer.push(Datum::from(self.snapshot_records_known.0.0));
851 packer.push(Datum::from(self.snapshot_records_staged.0.0));
852 packer.push(Datum::from(self.snapshot_committed.0.0));
854 packer.push(Datum::from(self.offset_known.0.pack()));
855 packer.push(Datum::from(self.offset_committed.0.pack()));
856 }
857
858 fn unpack(
859 row: Row,
860 metrics: &crate::metrics::StorageControllerMetrics,
861 ) -> (GlobalId, Option<ReplicaId>, Self) {
862 let mut iter = row.iter();
863 let id = iter.next().unwrap().unwrap_str().parse().unwrap();
864 let replica_id_or_null = iter.next().unwrap();
865
866 let replica_id = if replica_id_or_null.is_null() {
867 None
868 } else {
869 Some(
870 replica_id_or_null
871 .unwrap_str()
872 .parse::<ReplicaId>()
873 .unwrap(),
874 )
875 };
876
877 let mut s = Self {
878 id,
879 replica_id,
880
881 messages_received: iter.next().unwrap().unwrap_uint64().into(),
882 bytes_received: iter.next().unwrap().unwrap_uint64().into(),
883 updates_staged: iter.next().unwrap().unwrap_uint64().into(),
884 updates_committed: iter.next().unwrap().unwrap_uint64().into(),
885
886 records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
887 bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
888 rehydration_latency_ms: Gauge::gauge(
889 <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
890 .unwrap()
891 .map(|int| int.micros / 1000),
892 ),
893 snapshot_records_known: Gauge::gauge(
894 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
895 ),
896 snapshot_records_staged: Gauge::gauge(
897 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
898 ),
899
900 snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
901 offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
902 offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
903 last_updated: Instant::now(),
904 needs_zero_initialization: false,
907 };
908
909 s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
910 (s.id, replica_id, s)
911 }
912}
913
914impl ExpirableStats for ControllerSourceStatistics {
915 fn last_updated(&self) -> Instant {
916 self.last_updated
917 }
918}
919
920impl ZeroInitializedStats for ControllerSourceStatistics {
921 fn needs_zero_initialization(&self) -> bool {
922 self.needs_zero_initialization
923 }
924
925 fn mark_zero_initialized(&mut self) {
926 self.needs_zero_initialization = false;
927 }
928
929 fn zero_stat(&self) -> Self {
930 ControllerSourceStatistics::new(self.id, self.replica_id)
931 }
932}
933
934#[derive(Clone, Debug, PartialEq)]
940pub struct ControllerSinkStatistics {
941 pub id: GlobalId,
942 pub replica_id: ReplicaId,
943
944 pub messages_staged: Counter,
945 pub messages_committed: Counter,
946 pub bytes_staged: Counter,
947 pub bytes_committed: Counter,
948
949 pub last_updated: Instant,
951
952 needs_zero_initialization: bool,
954}
955
956impl ControllerSinkStatistics {
957 pub fn new(id: GlobalId, replica_id: ReplicaId) -> Self {
958 Self {
959 id,
960 replica_id,
961 messages_staged: Default::default(),
962 messages_committed: Default::default(),
963 bytes_staged: Default::default(),
964 bytes_committed: Default::default(),
965 last_updated: Instant::now(),
966 needs_zero_initialization: true,
967 }
968 }
969
970 pub fn incorporate(&mut self, update: SinkStatisticsUpdate) {
972 let ControllerSinkStatistics {
973 id: _,
974 replica_id: _,
975 messages_staged,
976 messages_committed,
977 bytes_staged,
978 bytes_committed,
979 last_updated,
980 needs_zero_initialization: _,
981 } = self;
982
983 messages_staged.incorporate(update.messages_staged, "messages_staged");
984 bytes_staged.incorporate(update.bytes_staged, "bytes_staged");
985 messages_committed.incorporate(update.messages_committed, "messages_committed");
986 bytes_committed.incorporate(update.bytes_committed, "bytes_committed");
987
988 *last_updated = Instant::now();
989 }
990}
991
992impl PackableStats for ControllerSinkStatistics {
993 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
994 use mz_repr::Datum;
995 packer.push(Datum::from(self.id.to_string().as_str()));
996 packer.push(Datum::from(self.replica_id.to_string().as_str()));
997 packer.push(Datum::from(self.messages_staged.0));
998 packer.push(Datum::from(self.messages_committed.0));
999 packer.push(Datum::from(self.bytes_staged.0));
1000 packer.push(Datum::from(self.bytes_committed.0));
1001 }
1002
1003 fn unpack(
1004 row: Row,
1005 _metrics: &crate::metrics::StorageControllerMetrics,
1006 ) -> (GlobalId, Option<ReplicaId>, Self) {
1007 let mut iter = row.iter();
1008 let s = Self {
1009 id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1010 replica_id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1011 messages_staged: iter.next().unwrap().unwrap_uint64().into(),
1012 messages_committed: iter.next().unwrap().unwrap_uint64().into(),
1013 bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
1014 bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
1015 last_updated: Instant::now(),
1016 needs_zero_initialization: false,
1019 };
1020 (s.id, Some(s.replica_id), s)
1021 }
1022}
1023
1024impl ExpirableStats for ControllerSinkStatistics {
1025 fn last_updated(&self) -> Instant {
1026 self.last_updated
1027 }
1028}
1029
1030impl ZeroInitializedStats for ControllerSinkStatistics {
1031 fn needs_zero_initialization(&self) -> bool {
1032 self.needs_zero_initialization
1033 }
1034
1035 fn mark_zero_initialized(&mut self) {
1036 self.needs_zero_initialization = false;
1037 }
1038
1039 fn zero_stat(&self) -> Self {
1040 ControllerSinkStatistics::new(self.id, self.replica_id)
1041 }
1042}
1043
1044#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1048pub struct SinkStatisticsUpdate {
1049 pub id: GlobalId,
1050
1051 pub messages_staged: Counter,
1052 pub messages_committed: Counter,
1053 pub bytes_staged: Counter,
1054 pub bytes_committed: Counter,
1055}
1056
1057impl SinkStatisticsUpdate {
1058 pub fn new(id: GlobalId) -> Self {
1059 Self {
1060 id,
1061 messages_staged: Default::default(),
1062 messages_committed: Default::default(),
1063 bytes_staged: Default::default(),
1064 bytes_committed: Default::default(),
1065 }
1066 }
1067
1068 pub fn incorporate(&mut self, other: SinkStatisticsUpdate) {
1069 let SinkStatisticsUpdate {
1070 messages_staged,
1071 messages_committed,
1072 bytes_staged,
1073 bytes_committed,
1074 ..
1075 } = self;
1076
1077 messages_staged.incorporate(other.messages_staged, "messages_staged");
1078 messages_committed.incorporate(other.messages_committed, "messages_committed");
1079 bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
1080 bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
1081 }
1082
1083 pub fn incorporate_counters(&mut self, other: SinkStatisticsUpdate) {
1085 let SinkStatisticsUpdate {
1086 messages_staged,
1087 messages_committed,
1088 bytes_staged,
1089 bytes_committed,
1090 ..
1091 } = self;
1092
1093 messages_staged.incorporate(other.messages_staged, "messages_staged");
1094 messages_committed.incorporate(other.messages_committed, "messages_committed");
1095 bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
1096 bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
1097 }
1098
1099 pub fn summarize<'a, I, F>(values: F) -> Self
1100 where
1101 I: IntoIterator<Item = &'a Self>,
1102 F: Fn() -> I,
1103 Self: 'a,
1104 {
1105 SinkStatisticsUpdate {
1106 id: values().into_iter().next().unwrap().id,
1107 messages_staged: Counter::summarize(values().into_iter().map(|s| &s.messages_staged)),
1108 messages_committed: Counter::summarize(
1109 values().into_iter().map(|s| &s.messages_committed),
1110 ),
1111 bytes_staged: Counter::summarize(values().into_iter().map(|s| &s.bytes_staged)),
1112 bytes_committed: Counter::summarize(values().into_iter().map(|s| &s.bytes_committed)),
1113 }
1114 }
1115
1116 pub fn reset_counters(&mut self) {
1118 self.messages_staged.0 = 0;
1119 self.messages_committed.0 = 0;
1120 self.bytes_staged.0 = 0;
1121 self.bytes_committed.0 = 0;
1122 }
1123
1124 pub fn reset_gauges(&self) {}
1126}
1127
1128impl PackableStats for SinkStatisticsUpdate {
1129 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
1130 use mz_repr::Datum;
1131 packer.push(Datum::from(self.id.to_string().as_str()));
1132 packer.push(Datum::from(self.messages_staged.0));
1133 packer.push(Datum::from(self.messages_committed.0));
1134 packer.push(Datum::from(self.bytes_staged.0));
1135 packer.push(Datum::from(self.bytes_committed.0));
1136 }
1137
1138 fn unpack(
1139 row: Row,
1140 _metrics: &crate::metrics::StorageControllerMetrics,
1141 ) -> (GlobalId, Option<ReplicaId>, Self) {
1142 let mut iter = row.iter();
1143 let s = Self {
1144 id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1146 messages_staged: iter.next().unwrap().unwrap_uint64().into(),
1148 messages_committed: iter.next().unwrap().unwrap_uint64().into(),
1149 bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
1150 bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
1151 };
1152 (s.id, None, s)
1153 }
1154}
1155
1156#[derive(Default, Debug)]
1158pub struct WebhookStatistics {
1159 pub messages_received: AtomicU64,
1160 pub bytes_received: AtomicU64,
1161 pub updates_staged: AtomicU64,
1162 pub updates_committed: AtomicU64,
1163}
1164
1165impl WebhookStatistics {
1166 pub fn drain_into_update(&self, id: GlobalId) -> SourceStatisticsUpdate {
1169 SourceStatisticsUpdate {
1170 id,
1171 messages_received: self.messages_received.swap(0, Ordering::Relaxed).into(),
1172 bytes_received: self.bytes_received.swap(0, Ordering::Relaxed).into(),
1173 updates_staged: self.updates_staged.swap(0, Ordering::Relaxed).into(),
1174 updates_committed: self.updates_committed.swap(0, Ordering::Relaxed).into(),
1175 records_indexed: Gauge::gauge(0),
1176 bytes_indexed: Gauge::gauge(0),
1177 rehydration_latency_ms: Gauge::gauge(None),
1178 snapshot_records_known: Gauge::gauge(None),
1179 snapshot_records_staged: Gauge::gauge(None),
1180 snapshot_committed: Gauge::gauge(true),
1181 offset_known: Gauge::gauge(None::<u64>),
1182 offset_committed: Gauge::gauge(None::<u64>),
1183 }
1184 }
1185}