1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Utilities for tracking metrics related to decoding.
1112use mz_ore::cast::CastFrom;
13use mz_ore::metric;
14use mz_ore::metrics::MetricsRegistry;
15use mz_ore::metrics::raw::IntCounterVec;
1617use crate::decode::{DataDecoderInner, PreDelimitedFormat};
1819/// Metrics specific to a single worker.
20#[derive(Clone, Debug)]
21pub struct DecodeMetricDefs {
22 events_read: IntCounterVec,
23}
2425impl DecodeMetricDefs {
26pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
27Self {
28 events_read: registry.register(metric!(
29 name: "mz_dataflow_events_read_total",
30 help: "Count of events we have read from the wire",
31 var_labels: ["format", "status"],
32 )),
33 }
34 }
3536fn counter_inc(&self, decoder: &DataDecoderInner, success: bool, n: usize) {
37let format_label = match decoder {
38 DataDecoderInner::Avro(_) => "avro",
39 DataDecoderInner::Csv(_) => "csv",
40 DataDecoderInner::DelimitedBytes { format, .. }
41 | DataDecoderInner::PreDelimited(format) => match format {
42 PreDelimitedFormat::Bytes => "raw",
43 PreDelimitedFormat::Json => "json",
44 PreDelimitedFormat::Text => "text",
45 PreDelimitedFormat::Regex(..) => "regex",
46 PreDelimitedFormat::Protobuf(..) => "protobuf",
47 },
48 };
49let success_label = if success { "success" } else { "error" };
50self.events_read
51 .with_label_values(&[format_label, success_label])
52 .inc_by(u64::cast_from(n));
53 }
5455/// Create (if it doesn't exist yet) a success counter for the given decoder, and increment
56 /// it `n` times.
57pub(crate) fn count_successes(&self, decoder: &DataDecoderInner, n: usize) {
58self.counter_inc(decoder, true, n);
59 }
6061/// Create (if it doesn't exist yet) an error counter for the given decoder, and increment
62 /// it `n` times.
63pub(crate) fn count_errors(&self, decoder: &DataDecoderInner, n: usize) {
64self.counter_inc(decoder, true, n);
65 }
66}