mz_storage/metrics/
decode.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities for tracking metrics related to decoding.
11
12use mz_ore::cast::CastFrom;
13use mz_ore::metric;
14use mz_ore::metrics::MetricsRegistry;
15use mz_ore::metrics::raw::IntCounterVec;
16
17use crate::decode::{DataDecoderInner, PreDelimitedFormat};
18
19/// Metrics specific to a single worker.
20#[derive(Clone, Debug)]
21pub struct DecodeMetricDefs {
22    events_read: IntCounterVec,
23}
24
25impl DecodeMetricDefs {
26    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
27        Self {
28            events_read: registry.register(metric!(
29                name: "mz_dataflow_events_read_total",
30                help: "Count of events we have read from the wire",
31                var_labels: ["format", "status"],
32            )),
33        }
34    }
35
36    fn counter_inc(&self, decoder: &DataDecoderInner, success: bool, n: usize) {
37        let format_label = match decoder {
38            DataDecoderInner::Avro(_) => "avro",
39            DataDecoderInner::Csv(_) => "csv",
40            DataDecoderInner::DelimitedBytes { format, .. }
41            | DataDecoderInner::PreDelimited(format) => match format {
42                PreDelimitedFormat::Bytes => "raw",
43                PreDelimitedFormat::Json => "json",
44                PreDelimitedFormat::Text => "text",
45                PreDelimitedFormat::Regex(..) => "regex",
46                PreDelimitedFormat::Protobuf(..) => "protobuf",
47            },
48        };
49        let success_label = if success { "success" } else { "error" };
50        self.events_read
51            .with_label_values(&[format_label, success_label])
52            .inc_by(u64::cast_from(n));
53    }
54
55    /// Create (if it doesn't exist yet) a success counter for the given decoder, and increment
56    /// it `n` times.
57    pub(crate) fn count_successes(&self, decoder: &DataDecoderInner, n: usize) {
58        self.counter_inc(decoder, true, n);
59    }
60
61    /// Create (if it doesn't exist yet) an error counter for the given decoder, and increment
62    /// it `n` times.
63    pub(crate) fn count_errors(&self, decoder: &DataDecoderInner, n: usize) {
64        self.counter_inc(decoder, true, n);
65    }
66}