mz_storage/metrics/
decode.rs1use mz_ore::cast::CastFrom;
13use mz_ore::metric;
14use mz_ore::metrics::MetricsRegistry;
15use mz_ore::metrics::raw::IntCounterVec;
16
17use crate::decode::{DataDecoderInner, PreDelimitedFormat};
18
19#[derive(Clone, Debug)]
21pub struct DecodeMetricDefs {
22 events_read: IntCounterVec,
23}
24
25impl DecodeMetricDefs {
26 pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
27 Self {
28 events_read: registry.register(metric!(
29 name: "mz_dataflow_events_read_total",
30 help: "Count of events we have read from the wire",
31 var_labels: ["format", "status"],
32 )),
33 }
34 }
35
36 fn counter_inc(&self, decoder: &DataDecoderInner, success: bool, n: usize) {
37 let format_label = match decoder {
38 DataDecoderInner::Avro(_) => "avro",
39 DataDecoderInner::Csv(_) => "csv",
40 DataDecoderInner::DelimitedBytes { format, .. }
41 | DataDecoderInner::PreDelimited(format) => match format {
42 PreDelimitedFormat::Bytes => "raw",
43 PreDelimitedFormat::Json => "json",
44 PreDelimitedFormat::Text => "text",
45 PreDelimitedFormat::Regex(..) => "regex",
46 PreDelimitedFormat::Protobuf(..) => "protobuf",
47 },
48 };
49 let success_label = if success { "success" } else { "error" };
50 self.events_read
51 .with_label_values(&[format_label, success_label])
52 .inc_by(u64::cast_from(n));
53 }
54
55 pub(crate) fn count_successes(&self, decoder: &DataDecoderInner, n: usize) {
58 self.counter_inc(decoder, true, n);
59 }
60
61 pub(crate) fn count_errors(&self, decoder: &DataDecoderInner, n: usize) {
64 self.counter_inc(decoder, true, n);
65 }
66}