1use std::sync::LazyLock;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::Instant;
22
23use mz_controller_types::ReplicaId;
24use serde::{Deserialize, Serialize};
25
26use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
27use mz_repr::{GlobalId, RelationDesc, Row, ScalarType};
28
29include!(concat!(env!("OUT_DIR"), "/mz_storage_client.statistics.rs"));
30
31pub static MZ_SOURCE_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
32 RelationDesc::builder()
33 .with_column("id", ScalarType::String.nullable(false))
35 .with_column("replica_id", ScalarType::String.nullable(true))
37 .with_column("messages_received", ScalarType::UInt64.nullable(false))
43 .with_column("bytes_received", ScalarType::UInt64.nullable(false))
46 .with_column("updates_staged", ScalarType::UInt64.nullable(false))
49 .with_column("updates_committed", ScalarType::UInt64.nullable(false))
52 .with_column("records_indexed", ScalarType::UInt64.nullable(false))
58 .with_column("bytes_indexed", ScalarType::UInt64.nullable(false))
61 .with_column("rehydration_latency", ScalarType::Interval.nullable(true))
65 .with_column("snapshot_records_known", ScalarType::UInt64.nullable(true))
73 .with_column("snapshot_records_staged", ScalarType::UInt64.nullable(true))
80 .with_column("snapshot_committed", ScalarType::Bool.nullable(false))
85 .with_column("offset_known", ScalarType::UInt64.nullable(true))
94 .with_column("offset_committed", ScalarType::UInt64.nullable(true))
97 .finish()
98});
99
100pub static MZ_SINK_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
101 RelationDesc::builder()
102 .with_column("id", ScalarType::String.nullable(false))
104 .with_column("replica_id", ScalarType::String.nullable(true))
106 .with_column("messages_staged", ScalarType::UInt64.nullable(false))
112 .with_column("messages_committed", ScalarType::UInt64.nullable(false))
115 .with_column("bytes_staged", ScalarType::UInt64.nullable(false))
118 .with_column("bytes_committed", ScalarType::UInt64.nullable(false))
121 .finish()
122});
123
124pub trait StorageMetric {
129 fn summarize<'a, I>(values: I) -> Self
132 where
133 I: IntoIterator<Item = &'a Self>,
134 Self: Sized + 'a;
135
136 fn incorporate(&mut self, other: Self, field_name: &'static str);
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
142pub struct Counter(u64);
143
144impl StorageMetric for Counter {
145 fn summarize<'a, I>(values: I) -> Self
146 where
147 I: IntoIterator<Item = &'a Self>,
148 Self: Sized + 'a,
149 {
150 Self(values.into_iter().map(|c| c.0).sum())
152 }
153
154 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
155 self.0 += other.0
157 }
158}
159
160impl From<u64> for Counter {
161 fn from(f: u64) -> Self {
162 Counter(f)
163 }
164}
165
166#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
168pub struct ResettingLatency(Option<i64>);
169
170impl From<Option<i64>> for ResettingLatency {
171 fn from(f: Option<i64>) -> Self {
172 ResettingLatency(f)
173 }
174}
175
176impl StorageMetric for ResettingLatency {
177 fn summarize<'a, I>(values: I) -> Self
178 where
179 I: IntoIterator<Item = &'a Self>,
180 Self: Sized + 'a,
181 {
182 let mut max = 0;
183 for value in values {
184 match value.0 {
185 None => return Self(None),
187 Some(value) => max = std::cmp::max(max, value),
189 }
190 }
191
192 Self(Some(max))
193 }
194
195 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
196 self.0 = other.0;
198 }
199}
200
201impl ResettingLatency {
202 fn reset(&mut self) {
203 self.0 = None;
204 }
205}
206
207#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
209pub struct ResettingNullableTotal(Option<u64>);
210
211impl StorageMetric for ResettingNullableTotal {
212 fn summarize<'a, I>(values: I) -> Self
213 where
214 I: IntoIterator<Item = &'a Self>,
215 Self: Sized + 'a,
216 {
217 let mut sum = 0;
218 for value in values {
219 match value.0 {
220 None => return Self(None),
222 Some(value) => sum += value,
224 }
225 }
226
227 Self(Some(sum))
228 }
229
230 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
231 match (&mut self.0, other.0) {
232 (None, other) => {
233 self.0 = other;
234 }
235 (Some(this), Some(other)) => *this = other,
237 (Some(_), None) => {
238 }
240 }
241 }
242}
243
244impl From<Option<u64>> for ResettingNullableTotal {
245 fn from(f: Option<u64>) -> Self {
246 ResettingNullableTotal(f)
247 }
248}
249
250impl ResettingNullableTotal {
251 fn reset(&mut self) {
252 self.0 = None;
253 }
254}
255
256#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
258pub struct ResettingTotal(u64);
259
260impl StorageMetric for ResettingTotal {
261 fn summarize<'a, I>(values: I) -> Self
262 where
263 I: IntoIterator<Item = &'a Self>,
264 Self: Sized + 'a,
265 {
266 Self(values.into_iter().map(|c| c.0).sum())
268 }
269
270 fn incorporate(&mut self, other: Self, _field_name: &'static str) {
271 self.0 = other.0;
273 }
274}
275
276impl From<u64> for ResettingTotal {
277 fn from(f: u64) -> Self {
278 ResettingTotal(f)
279 }
280}
281
282impl ResettingTotal {
283 fn reset(&mut self) {
284 self.0 = 0;
285 }
286}
287
288#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
290pub struct Boolean(bool);
291
292impl StorageMetric for Boolean {
293 fn summarize<'a, I>(values: I) -> Self
294 where
295 I: IntoIterator<Item = &'a Self>,
296 Self: Sized + 'a,
297 {
298 Self(values.into_iter().fold(true, |base, new| base & new.0))
300 }
301
302 fn incorporate(&mut self, other: Self, field_name: &'static str) {
303 #[allow(clippy::bool_comparison)]
307 if other.0 < self.0 {
308 tracing::error!(
309 "boolean gauge for field {field_name} erroneously regressed from true to false",
310 );
311 return;
312 }
313 self.0 = other.0;
314 }
315}
316
317impl From<bool> for Boolean {
318 fn from(f: bool) -> Self {
319 Boolean(f)
320 }
321}
322
323#[derive(Debug, Serialize, Deserialize, Default)]
325pub struct Total {
326 total: Option<u64>,
328 #[serde(skip)]
331 regressions:
332 Option<mz_ore::metrics::DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>>>,
333}
334
335impl From<Option<u64>> for Total {
336 fn from(f: Option<u64>) -> Self {
337 Total {
338 total: f,
339 regressions: None,
340 }
341 }
342}
343
344impl Clone for Total {
345 fn clone(&self) -> Self {
346 Self {
347 total: self.total,
348 regressions: None,
349 }
350 }
351}
352
353impl PartialEq for Total {
354 fn eq(&self, other: &Self) -> bool {
355 self.total == other.total
356 }
357}
358
359impl Total {
360 fn pack(&self) -> u64 {
362 self.total.unwrap_or_default()
363 }
364}
365
366impl StorageMetric for Total {
367 fn summarize<'a, I>(values: I) -> Self
368 where
369 I: IntoIterator<Item = &'a Self>,
370 Self: Sized + 'a,
371 {
372 let mut any_none = false;
375
376 let inner = values
377 .into_iter()
378 .filter_map(|i| {
379 any_none |= i.total.is_none();
380 i.total.as_ref()
381 })
382 .sum();
383
384 Self {
387 total: (!any_none).then_some(inner),
388 regressions: None,
389 }
390 }
391
392 fn incorporate(&mut self, other: Self, field_name: &'static str) {
393 match (&mut self.total, other.total) {
394 (_, None) => {}
395 (None, Some(other)) => self.total = Some(other),
396 (Some(this), Some(other)) => {
397 if other < *this {
398 if let Some(metric) = &self.regressions {
399 metric.inc()
400 } else {
401 tracing::error!(
402 "total gauge {field_name} erroneously regressed from {} to {}",
403 this,
404 other
405 );
406 }
407 return;
408 }
409 *this = other
410 }
411 }
412 }
413}
414
415#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
417pub struct Gauge<T>(T);
418
419impl<T> Gauge<T> {
420 pub fn gauge<F>(f: F) -> Self
422 where
423 T: From<F>,
424 {
425 Gauge(f.into())
426 }
427}
428
429impl<T: StorageMetric> StorageMetric for Gauge<T> {
430 fn summarize<'a, I>(values: I) -> Self
431 where
432 I: IntoIterator<Item = &'a Self>,
433 Self: Sized + 'a,
434 {
435 Gauge(T::summarize(values.into_iter().map(|i| &i.0)))
436 }
437
438 fn incorporate(&mut self, other: Self, field_name: &'static str) {
439 self.0.incorporate(other.0, field_name)
440 }
441}
442
443pub trait PackableStats {
446 fn pack(&self, packer: mz_repr::RowPacker<'_>);
448 fn unpack(
450 row: Row,
451 metrics: &crate::metrics::StorageControllerMetrics,
452 ) -> (GlobalId, Option<ReplicaId>, Self);
453}
454
455pub trait ExpirableStats {
457 fn last_updated(&self) -> Instant;
460}
461
462pub trait ZeroInitializedStats {
472 fn needs_zero_initialization(&self) -> bool;
474
475 fn mark_zero_initialized(&mut self);
477
478 fn zero_stat(&self) -> Self;
481}
482
483#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
487pub struct SourceStatisticsUpdate {
488 pub id: GlobalId,
489
490 pub messages_received: Counter,
491 pub bytes_received: Counter,
492 pub updates_staged: Counter,
493 pub updates_committed: Counter,
494
495 pub records_indexed: Gauge<ResettingTotal>,
496 pub bytes_indexed: Gauge<ResettingTotal>,
497 pub rehydration_latency_ms: Gauge<ResettingLatency>,
498 pub snapshot_records_known: Gauge<ResettingNullableTotal>,
499 pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
500
501 pub snapshot_committed: Gauge<Boolean>,
502 pub offset_known: Gauge<Total>,
521 pub offset_committed: Gauge<Total>,
522}
523
524impl SourceStatisticsUpdate {
525 pub fn new(id: GlobalId) -> Self {
526 Self {
527 id,
528 messages_received: Default::default(),
529 bytes_received: Default::default(),
530 updates_staged: Default::default(),
531 updates_committed: Default::default(),
532 records_indexed: Default::default(),
533 bytes_indexed: Default::default(),
534 rehydration_latency_ms: Default::default(),
535 snapshot_records_known: Default::default(),
536 snapshot_records_staged: Default::default(),
537 snapshot_committed: Default::default(),
538 offset_known: Default::default(),
539 offset_committed: Default::default(),
540 }
541 }
542
543 pub fn summarize<'a, I, F>(values: F) -> Self
544 where
545 I: IntoIterator<Item = &'a Self>,
546 F: Fn() -> I,
547 Self: 'a,
548 {
549 SourceStatisticsUpdate {
550 id: values().into_iter().next().unwrap().id,
551 messages_received: Counter::summarize(
552 values().into_iter().map(|s| &s.messages_received),
553 ),
554 bytes_received: Counter::summarize(values().into_iter().map(|s| &s.bytes_received)),
555 updates_staged: Counter::summarize(values().into_iter().map(|s| &s.updates_staged)),
556 updates_committed: Counter::summarize(
557 values().into_iter().map(|s| &s.updates_committed),
558 ),
559 records_indexed: Gauge::summarize(values().into_iter().map(|s| &s.records_indexed)),
560 bytes_indexed: Gauge::summarize(values().into_iter().map(|s| &s.bytes_indexed)),
561 rehydration_latency_ms: Gauge::summarize(
562 values().into_iter().map(|s| &s.rehydration_latency_ms),
563 ),
564 snapshot_records_known: Gauge::summarize(
565 values().into_iter().map(|s| &s.snapshot_records_known),
566 ),
567 snapshot_records_staged: Gauge::summarize(
568 values().into_iter().map(|s| &s.snapshot_records_staged),
569 ),
570 snapshot_committed: Gauge::summarize(
571 values().into_iter().map(|s| &s.snapshot_committed),
572 ),
573 offset_known: Gauge::summarize(values().into_iter().map(|s| &s.offset_known)),
574 offset_committed: Gauge::summarize(values().into_iter().map(|s| &s.offset_committed)),
575 }
576 }
577
578 pub fn reset_counters(&mut self) {
580 self.messages_received.0 = 0;
581 self.bytes_received.0 = 0;
582 self.updates_staged.0 = 0;
583 self.updates_committed.0 = 0;
584 }
585
586 pub fn reset_gauges(&mut self) {
588 self.records_indexed.0.reset();
589 self.bytes_indexed.0.reset();
590 self.rehydration_latency_ms.0.reset();
591 self.snapshot_records_known.0.reset();
592 self.snapshot_records_staged.0.reset();
593 }
594
595 pub fn incorporate(&mut self, other: SourceStatisticsUpdate) {
596 let SourceStatisticsUpdate {
597 messages_received,
598 bytes_received,
599 updates_staged,
600 updates_committed,
601 records_indexed,
602 bytes_indexed,
603 rehydration_latency_ms,
604 snapshot_records_known,
605 snapshot_records_staged,
606 snapshot_committed,
607 offset_known,
608 offset_committed,
609 ..
610 } = self;
611
612 messages_received.incorporate(other.messages_received, "messages_received");
613 bytes_received.incorporate(other.bytes_received, "bytes_received");
614 updates_staged.incorporate(other.updates_staged, "updates_staged");
615 updates_committed.incorporate(other.updates_committed, "updates_committed");
616 records_indexed.incorporate(other.records_indexed, "records_indexed");
617 bytes_indexed.incorporate(other.bytes_indexed, "bytes_indexed");
618 rehydration_latency_ms.incorporate(other.rehydration_latency_ms, "rehydration_latency_ms");
619 snapshot_records_known.incorporate(other.snapshot_records_known, "snapshot_records_known");
620 snapshot_records_staged
621 .incorporate(other.snapshot_records_staged, "snapshot_records_staged");
622 snapshot_committed.incorporate(other.snapshot_committed, "snapshot_committed");
623 offset_known.incorporate(other.offset_known, "offset_known");
624 offset_committed.incorporate(other.offset_committed, "offset_committed");
625 }
626
627 pub fn incorporate_counters(&mut self, other: SourceStatisticsUpdate) {
629 let SourceStatisticsUpdate {
630 messages_received,
631 bytes_received,
632 updates_staged,
633 updates_committed,
634 ..
635 } = self;
636
637 messages_received.incorporate(other.messages_received, "messages_received");
638 bytes_received.incorporate(other.bytes_received, "bytes_received");
639 updates_staged.incorporate(other.updates_staged, "updates_staged");
640 updates_committed.incorporate(other.updates_committed, "updates_committed");
641 }
642
643 pub fn with_metrics(mut self, metrics: &crate::metrics::StorageControllerMetrics) -> Self {
645 self.offset_known.0.regressions = Some(metrics.regressed_offset_known(self.id));
646 self
647 }
648}
649
650impl PackableStats for SourceStatisticsUpdate {
651 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
652 use mz_repr::Datum;
653 packer.push(Datum::from(self.id.to_string().as_str()));
655 packer.push(Datum::from(self.messages_received.0));
657 packer.push(Datum::from(self.bytes_received.0));
658 packer.push(Datum::from(self.updates_staged.0));
659 packer.push(Datum::from(self.updates_committed.0));
660 packer.push(Datum::from(self.records_indexed.0.0));
662 packer.push(Datum::from(self.bytes_indexed.0.0));
663 let rehydration_latency = self
664 .rehydration_latency_ms
665 .0
666 .0
667 .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
668 packer.push(Datum::from(rehydration_latency));
669 packer.push(Datum::from(self.snapshot_records_known.0.0));
670 packer.push(Datum::from(self.snapshot_records_staged.0.0));
671 packer.push(Datum::from(self.snapshot_committed.0.0));
673 packer.push(Datum::from(self.offset_known.0.pack()));
674 packer.push(Datum::from(self.offset_committed.0.pack()));
675 }
676
677 fn unpack(
678 row: Row,
679 metrics: &crate::metrics::StorageControllerMetrics,
680 ) -> (GlobalId, Option<ReplicaId>, Self) {
681 let mut iter = row.iter();
682 let mut s = Self {
683 id: iter.next().unwrap().unwrap_str().parse().unwrap(),
684
685 messages_received: iter.next().unwrap().unwrap_uint64().into(),
686 bytes_received: iter.next().unwrap().unwrap_uint64().into(),
687 updates_staged: iter.next().unwrap().unwrap_uint64().into(),
688 updates_committed: iter.next().unwrap().unwrap_uint64().into(),
689
690 records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
691 bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
692 rehydration_latency_ms: Gauge::gauge(
693 <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
694 .unwrap()
695 .map(|int| int.micros / 1000),
696 ),
697 snapshot_records_known: Gauge::gauge(
698 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
699 ),
700 snapshot_records_staged: Gauge::gauge(
701 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
702 ),
703
704 snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
705 offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
706 offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
707 };
708
709 s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
710 (s.id, None, s)
711 }
712}
713
714impl RustType<ProtoSourceStatisticsUpdate> for SourceStatisticsUpdate {
715 fn into_proto(&self) -> ProtoSourceStatisticsUpdate {
716 ProtoSourceStatisticsUpdate {
717 id: Some(self.id.into_proto()),
718
719 messages_received: self.messages_received.0,
720 bytes_received: self.bytes_received.0,
721 updates_staged: self.updates_staged.0,
722 updates_committed: self.updates_committed.0,
723
724 records_indexed: self.records_indexed.0.0,
725 bytes_indexed: self.bytes_indexed.0.0,
726 rehydration_latency_ms: self.rehydration_latency_ms.0.0,
727 snapshot_records_known: self.snapshot_records_known.0.0,
728 snapshot_records_staged: self.snapshot_records_staged.0.0,
729
730 snapshot_committed: self.snapshot_committed.0.0,
731 offset_known: self.offset_known.0.total,
732 offset_committed: self.offset_committed.0.total,
733 }
734 }
735
736 fn from_proto(proto: ProtoSourceStatisticsUpdate) -> Result<Self, TryFromProtoError> {
737 Ok(SourceStatisticsUpdate {
738 id: proto
739 .id
740 .into_rust_if_some("ProtoSourceStatisticsUpdate::id")?,
741
742 messages_received: Counter(proto.messages_received),
743 bytes_received: Counter(proto.bytes_received),
744 updates_staged: Counter(proto.updates_staged),
745 updates_committed: Counter(proto.updates_committed),
746
747 records_indexed: Gauge::gauge(proto.records_indexed),
748 bytes_indexed: Gauge::gauge(proto.bytes_indexed),
749 rehydration_latency_ms: Gauge::gauge(proto.rehydration_latency_ms),
750 snapshot_records_known: Gauge::gauge(proto.snapshot_records_known),
751 snapshot_records_staged: Gauge::gauge(proto.snapshot_records_staged),
752
753 snapshot_committed: Gauge::gauge(proto.snapshot_committed),
754 offset_known: Gauge::gauge(proto.offset_known),
755 offset_committed: Gauge::gauge(proto.offset_committed),
756 })
757 }
758}
759
760#[derive(Clone, Debug, PartialEq)]
766pub struct ControllerSourceStatistics {
767 pub id: GlobalId,
768
769 pub replica_id: Option<ReplicaId>,
773
774 pub messages_received: Counter,
775 pub bytes_received: Counter,
776 pub updates_staged: Counter,
777 pub updates_committed: Counter,
778
779 pub records_indexed: Gauge<ResettingTotal>,
780 pub bytes_indexed: Gauge<ResettingTotal>,
781 pub rehydration_latency_ms: Gauge<ResettingLatency>,
782 pub snapshot_records_known: Gauge<ResettingNullableTotal>,
783 pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
784
785 pub snapshot_committed: Gauge<Boolean>,
786 pub offset_known: Gauge<Total>,
787 pub offset_committed: Gauge<Total>,
788
789 pub last_updated: Instant,
798
799 needs_zero_initialization: bool,
801}
802
803impl ControllerSourceStatistics {
804 pub fn new(id: GlobalId, replica_id: Option<ReplicaId>) -> Self {
805 Self {
806 id,
807 replica_id,
808 messages_received: Default::default(),
809 bytes_received: Default::default(),
810 updates_staged: Default::default(),
811 updates_committed: Default::default(),
812 records_indexed: Default::default(),
813 bytes_indexed: Default::default(),
814 rehydration_latency_ms: Default::default(),
815 snapshot_records_known: Default::default(),
816 snapshot_records_staged: Default::default(),
817 snapshot_committed: Default::default(),
818 offset_known: Default::default(),
819 offset_committed: Default::default(),
820 last_updated: Instant::now(),
821 needs_zero_initialization: true,
822 }
823 }
824
825 pub fn incorporate(&mut self, update: SourceStatisticsUpdate) {
828 let ControllerSourceStatistics {
829 id: _,
830 replica_id: _,
831 messages_received,
832 bytes_received,
833 updates_staged,
834 updates_committed,
835 records_indexed,
836 bytes_indexed,
837 rehydration_latency_ms,
838 snapshot_records_known,
839 snapshot_records_staged,
840 snapshot_committed,
841 offset_known,
842 offset_committed,
843 last_updated,
844 needs_zero_initialization: _,
845 } = self;
846
847 messages_received.incorporate(update.messages_received, "messages_received");
848 bytes_received.incorporate(update.bytes_received, "bytes_received");
849 updates_staged.incorporate(update.updates_staged, "updates_staged");
850 updates_committed.incorporate(update.updates_committed, "updates_committed");
851 records_indexed.incorporate(update.records_indexed, "records_indexed");
852 bytes_indexed.incorporate(update.bytes_indexed, "bytes_indexed");
853 rehydration_latency_ms.incorporate(update.rehydration_latency_ms, "rehydration_latency_ms");
854 snapshot_records_known.incorporate(update.snapshot_records_known, "snapshot_records_known");
855 snapshot_records_staged
856 .incorporate(update.snapshot_records_staged, "snapshot_records_staged");
857 snapshot_committed.incorporate(update.snapshot_committed, "snapshot_committed");
858 offset_known.incorporate(update.offset_known, "offset_known");
859 offset_committed.incorporate(update.offset_committed, "offset_committed");
860
861 *last_updated = Instant::now();
862 }
863}
864
865impl PackableStats for ControllerSourceStatistics {
866 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
867 use mz_repr::Datum;
868 packer.push(Datum::from(self.id.to_string().as_str()));
870 if let Some(replica_id) = self.replica_id {
872 packer.push(Datum::from(replica_id.to_string().as_str()));
873 } else {
874 packer.push(Datum::Null);
875 }
876 packer.push(Datum::from(self.messages_received.0));
878 packer.push(Datum::from(self.bytes_received.0));
879 packer.push(Datum::from(self.updates_staged.0));
880 packer.push(Datum::from(self.updates_committed.0));
881 packer.push(Datum::from(self.records_indexed.0.0));
883 packer.push(Datum::from(self.bytes_indexed.0.0));
884 let rehydration_latency = self
885 .rehydration_latency_ms
886 .0
887 .0
888 .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
889 packer.push(Datum::from(rehydration_latency));
890 packer.push(Datum::from(self.snapshot_records_known.0.0));
891 packer.push(Datum::from(self.snapshot_records_staged.0.0));
892 packer.push(Datum::from(self.snapshot_committed.0.0));
894 packer.push(Datum::from(self.offset_known.0.pack()));
895 packer.push(Datum::from(self.offset_committed.0.pack()));
896 }
897
898 fn unpack(
899 row: Row,
900 metrics: &crate::metrics::StorageControllerMetrics,
901 ) -> (GlobalId, Option<ReplicaId>, Self) {
902 let mut iter = row.iter();
903 let id = iter.next().unwrap().unwrap_str().parse().unwrap();
904 let replica_id_or_null = iter.next().unwrap();
905
906 let replica_id = if replica_id_or_null.is_null() {
907 None
908 } else {
909 Some(
910 replica_id_or_null
911 .unwrap_str()
912 .parse::<ReplicaId>()
913 .unwrap(),
914 )
915 };
916
917 let mut s = Self {
918 id,
919 replica_id,
920
921 messages_received: iter.next().unwrap().unwrap_uint64().into(),
922 bytes_received: iter.next().unwrap().unwrap_uint64().into(),
923 updates_staged: iter.next().unwrap().unwrap_uint64().into(),
924 updates_committed: iter.next().unwrap().unwrap_uint64().into(),
925
926 records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
927 bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
928 rehydration_latency_ms: Gauge::gauge(
929 <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
930 .unwrap()
931 .map(|int| int.micros / 1000),
932 ),
933 snapshot_records_known: Gauge::gauge(
934 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
935 ),
936 snapshot_records_staged: Gauge::gauge(
937 <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
938 ),
939
940 snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
941 offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
942 offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
943 last_updated: Instant::now(),
944 needs_zero_initialization: false,
947 };
948
949 s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
950 (s.id, replica_id, s)
951 }
952}
953
954impl ExpirableStats for ControllerSourceStatistics {
955 fn last_updated(&self) -> Instant {
956 self.last_updated
957 }
958}
959
960impl ZeroInitializedStats for ControllerSourceStatistics {
961 fn needs_zero_initialization(&self) -> bool {
962 self.needs_zero_initialization
963 }
964
965 fn mark_zero_initialized(&mut self) {
966 self.needs_zero_initialization = false;
967 }
968
969 fn zero_stat(&self) -> Self {
970 ControllerSourceStatistics::new(self.id, self.replica_id)
971 }
972}
973
974#[derive(Clone, Debug, PartialEq)]
980pub struct ControllerSinkStatistics {
981 pub id: GlobalId,
982 pub replica_id: ReplicaId,
983
984 pub messages_staged: Counter,
985 pub messages_committed: Counter,
986 pub bytes_staged: Counter,
987 pub bytes_committed: Counter,
988
989 pub last_updated: Instant,
991
992 needs_zero_initialization: bool,
994}
995
996impl ControllerSinkStatistics {
997 pub fn new(id: GlobalId, replica_id: ReplicaId) -> Self {
998 Self {
999 id,
1000 replica_id,
1001 messages_staged: Default::default(),
1002 messages_committed: Default::default(),
1003 bytes_staged: Default::default(),
1004 bytes_committed: Default::default(),
1005 last_updated: Instant::now(),
1006 needs_zero_initialization: true,
1007 }
1008 }
1009
1010 pub fn incorporate(&mut self, update: SinkStatisticsUpdate) {
1012 let ControllerSinkStatistics {
1013 id: _,
1014 replica_id: _,
1015 messages_staged,
1016 messages_committed,
1017 bytes_staged,
1018 bytes_committed,
1019 last_updated,
1020 needs_zero_initialization: _,
1021 } = self;
1022
1023 messages_staged.incorporate(update.messages_staged, "messages_staged");
1024 bytes_staged.incorporate(update.bytes_staged, "bytes_staged");
1025 messages_committed.incorporate(update.messages_committed, "messages_committed");
1026 bytes_committed.incorporate(update.bytes_committed, "bytes_committed");
1027
1028 *last_updated = Instant::now();
1029 }
1030}
1031
1032impl PackableStats for ControllerSinkStatistics {
1033 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
1034 use mz_repr::Datum;
1035 packer.push(Datum::from(self.id.to_string().as_str()));
1036 packer.push(Datum::from(self.replica_id.to_string().as_str()));
1037 packer.push(Datum::from(self.messages_staged.0));
1038 packer.push(Datum::from(self.messages_committed.0));
1039 packer.push(Datum::from(self.bytes_staged.0));
1040 packer.push(Datum::from(self.bytes_committed.0));
1041 }
1042
1043 fn unpack(
1044 row: Row,
1045 _metrics: &crate::metrics::StorageControllerMetrics,
1046 ) -> (GlobalId, Option<ReplicaId>, Self) {
1047 let mut iter = row.iter();
1048 let s = Self {
1049 id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1050 replica_id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1051 messages_staged: iter.next().unwrap().unwrap_uint64().into(),
1052 messages_committed: iter.next().unwrap().unwrap_uint64().into(),
1053 bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
1054 bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
1055 last_updated: Instant::now(),
1056 needs_zero_initialization: false,
1059 };
1060 (s.id, Some(s.replica_id), s)
1061 }
1062}
1063
1064impl ExpirableStats for ControllerSinkStatistics {
1065 fn last_updated(&self) -> Instant {
1066 self.last_updated
1067 }
1068}
1069
1070impl ZeroInitializedStats for ControllerSinkStatistics {
1071 fn needs_zero_initialization(&self) -> bool {
1072 self.needs_zero_initialization
1073 }
1074
1075 fn mark_zero_initialized(&mut self) {
1076 self.needs_zero_initialization = false;
1077 }
1078
1079 fn zero_stat(&self) -> Self {
1080 ControllerSinkStatistics::new(self.id, self.replica_id)
1081 }
1082}
1083
1084#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1088pub struct SinkStatisticsUpdate {
1089 pub id: GlobalId,
1090
1091 pub messages_staged: Counter,
1092 pub messages_committed: Counter,
1093 pub bytes_staged: Counter,
1094 pub bytes_committed: Counter,
1095}
1096
1097impl SinkStatisticsUpdate {
1098 pub fn new(id: GlobalId) -> Self {
1099 Self {
1100 id,
1101 messages_staged: Default::default(),
1102 messages_committed: Default::default(),
1103 bytes_staged: Default::default(),
1104 bytes_committed: Default::default(),
1105 }
1106 }
1107
1108 pub fn incorporate(&mut self, other: SinkStatisticsUpdate) {
1109 let SinkStatisticsUpdate {
1110 messages_staged,
1111 messages_committed,
1112 bytes_staged,
1113 bytes_committed,
1114 ..
1115 } = self;
1116
1117 messages_staged.incorporate(other.messages_staged, "messages_staged");
1118 messages_committed.incorporate(other.messages_committed, "messages_committed");
1119 bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
1120 bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
1121 }
1122
1123 pub fn incorporate_counters(&mut self, other: SinkStatisticsUpdate) {
1125 let SinkStatisticsUpdate {
1126 messages_staged,
1127 messages_committed,
1128 bytes_staged,
1129 bytes_committed,
1130 ..
1131 } = self;
1132
1133 messages_staged.incorporate(other.messages_staged, "messages_staged");
1134 messages_committed.incorporate(other.messages_committed, "messages_committed");
1135 bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
1136 bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
1137 }
1138
1139 pub fn summarize<'a, I, F>(values: F) -> Self
1140 where
1141 I: IntoIterator<Item = &'a Self>,
1142 F: Fn() -> I,
1143 Self: 'a,
1144 {
1145 SinkStatisticsUpdate {
1146 id: values().into_iter().next().unwrap().id,
1147 messages_staged: Counter::summarize(values().into_iter().map(|s| &s.messages_staged)),
1148 messages_committed: Counter::summarize(
1149 values().into_iter().map(|s| &s.messages_committed),
1150 ),
1151 bytes_staged: Counter::summarize(values().into_iter().map(|s| &s.bytes_staged)),
1152 bytes_committed: Counter::summarize(values().into_iter().map(|s| &s.bytes_committed)),
1153 }
1154 }
1155
1156 pub fn reset_counters(&mut self) {
1158 self.messages_staged.0 = 0;
1159 self.messages_committed.0 = 0;
1160 self.bytes_staged.0 = 0;
1161 self.bytes_committed.0 = 0;
1162 }
1163
1164 pub fn reset_gauges(&self) {}
1166}
1167
1168impl PackableStats for SinkStatisticsUpdate {
1169 fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
1170 use mz_repr::Datum;
1171 packer.push(Datum::from(self.id.to_string().as_str()));
1172 packer.push(Datum::from(self.messages_staged.0));
1173 packer.push(Datum::from(self.messages_committed.0));
1174 packer.push(Datum::from(self.bytes_staged.0));
1175 packer.push(Datum::from(self.bytes_committed.0));
1176 }
1177
1178 fn unpack(
1179 row: Row,
1180 _metrics: &crate::metrics::StorageControllerMetrics,
1181 ) -> (GlobalId, Option<ReplicaId>, Self) {
1182 let mut iter = row.iter();
1183 let s = Self {
1184 id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1186 messages_staged: iter.next().unwrap().unwrap_uint64().into(),
1188 messages_committed: iter.next().unwrap().unwrap_uint64().into(),
1189 bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
1190 bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
1191 };
1192 (s.id, None, s)
1193 }
1194}
1195
1196impl RustType<ProtoSinkStatisticsUpdate> for SinkStatisticsUpdate {
1197 fn into_proto(&self) -> ProtoSinkStatisticsUpdate {
1198 ProtoSinkStatisticsUpdate {
1199 id: Some(self.id.into_proto()),
1200
1201 messages_staged: self.messages_staged.0,
1202 messages_committed: self.messages_committed.0,
1203 bytes_staged: self.bytes_staged.0,
1204 bytes_committed: self.bytes_committed.0,
1205 }
1206 }
1207
1208 fn from_proto(proto: ProtoSinkStatisticsUpdate) -> Result<Self, TryFromProtoError> {
1209 Ok(SinkStatisticsUpdate {
1210 id: proto
1211 .id
1212 .into_rust_if_some("ProtoSinkStatisticsUpdate::id")?,
1213
1214 messages_staged: Counter(proto.messages_staged),
1215 messages_committed: Counter(proto.messages_committed),
1216 bytes_staged: Counter(proto.bytes_staged),
1217 bytes_committed: Counter(proto.bytes_committed),
1218 })
1219 }
1220}
1221
1222#[derive(Default, Debug)]
1224pub struct WebhookStatistics {
1225 pub messages_received: AtomicU64,
1226 pub bytes_received: AtomicU64,
1227 pub updates_staged: AtomicU64,
1228 pub updates_committed: AtomicU64,
1229}
1230
1231impl WebhookStatistics {
1232 pub fn drain_into_update(&self, id: GlobalId) -> SourceStatisticsUpdate {
1235 SourceStatisticsUpdate {
1236 id,
1237 messages_received: self.messages_received.swap(0, Ordering::Relaxed).into(),
1238 bytes_received: self.bytes_received.swap(0, Ordering::Relaxed).into(),
1239 updates_staged: self.updates_staged.swap(0, Ordering::Relaxed).into(),
1240 updates_committed: self.updates_committed.swap(0, Ordering::Relaxed).into(),
1241 records_indexed: Gauge::gauge(0),
1242 bytes_indexed: Gauge::gauge(0),
1243 rehydration_latency_ms: Gauge::gauge(None),
1244 snapshot_records_known: Gauge::gauge(None),
1245 snapshot_records_staged: Gauge::gauge(None),
1246 snapshot_committed: Gauge::gauge(true),
1247 offset_known: Gauge::gauge(None::<u64>),
1248 offset_committed: Gauge::gauge(None::<u64>),
1249 }
1250 }
1251}