Skip to main content

mz_storage/metrics/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for iceberg sinks.
11
12use mz_ore::{
13    metric,
14    metrics::{
15        DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec, IntCounterVec,
16        MetricTag, MetricVisibility, UIntGaugeVec,
17    },
18    stats::histogram_seconds_buckets,
19};
20use mz_repr::GlobalId;
21use prometheus::core::AtomicU64;
22
23#[derive(Debug, Clone)]
24pub(crate) struct IcebergSinkMetricDefs {
25    /// Number of data files written by the iceberg sink.
26    pub data_files_written: IntCounterVec,
27    /// Number of delete files written by the iceberg sink.
28    pub delete_files_written: IntCounterVec,
29    /// Number of stashed rows in the iceberg sink.
30    pub stashed_rows: UIntGaugeVec,
31    /// Number of snapshots committed by the iceberg sink.
32    pub snapshots_committed: IntCounterVec,
33    /// Commit failures in the iceberg sink.
34    pub commit_failures: IntCounterVec,
35    /// Commit conflicts in the iceberg sink.
36    pub commit_conflicts: IntCounterVec,
37    /// Time spent committing batches to Iceberg.
38    pub commit_duration_seconds: HistogramVec,
39    /// Time spent closing Iceberg DeltaWriters.
40    pub writer_close_duration_seconds: HistogramVec,
41}
42
43impl IcebergSinkMetricDefs {
44    // Every metric must have a worker specific id associated with it. These are later wrapped
45    // in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
46    // would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
47    // workers may still be running, but the metrics registry will no longer record or report
48    // metrics for that `source_id`.
49    pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
50        Self {
51            data_files_written: registry.register(metric!(
52                name: "mz_sink_iceberg_data_files_written",
53                help: "Number of data files written by the iceberg sink",
54                var_labels: ["sink_id", "worker_id"],
55                visibility: MetricVisibility::Public,
56                tags: [MetricTag::Sink],
57            )),
58            delete_files_written: registry.register(metric!(
59                name: "mz_sink_iceberg_delete_files_written",
60                help: "Number of delete files written by the iceberg sink",
61                var_labels: ["sink_id", "worker_id"],
62                visibility: MetricVisibility::Public,
63                tags: [MetricTag::Sink],
64            )),
65            stashed_rows: registry.register(metric!(
66                name: "mz_sink_iceberg_stashed_rows",
67                help: "Number of stashed rows in the iceberg sink",
68                var_labels: ["sink_id", "worker_id"]
69            )),
70            snapshots_committed: registry.register(metric!(
71                name: "mz_sink_iceberg_snapshots_committed",
72                help: "Number of snapshots committed by the iceberg sink",
73                var_labels: ["sink_id", "worker_id"],
74                visibility: MetricVisibility::Public,
75                tags: [MetricTag::Sink],
76            )),
77            commit_failures: registry.register(metric!(
78                name: "mz_sink_iceberg_commit_failures",
79                help: "Number of commit failures in the iceberg sink",
80                var_labels: ["sink_id", "worker_id"],
81                visibility: MetricVisibility::Public,
82                tags: [MetricTag::Sink],
83            )),
84            commit_conflicts: registry.register(metric!(
85                name: "mz_sink_iceberg_commit_conflicts",
86                help: "Number of commit conflicts in the iceberg sink",
87                var_labels: ["sink_id", "worker_id"],
88                visibility: MetricVisibility::Public,
89                tags: [MetricTag::Sink],
90            )),
91            commit_duration_seconds: registry.register(metric!(
92                name: "mz_sink_iceberg_commit_duration_seconds",
93                help: "Time spent committing batches to Iceberg in seconds",
94                var_labels: ["sink_id", "worker_id"],
95                buckets: histogram_seconds_buckets(0.001, 32.0),
96                visibility: MetricVisibility::Public,
97                tags: [MetricTag::Sink],
98            )),
99            writer_close_duration_seconds: registry.register(metric!(
100                name: "mz_sink_iceberg_writer_close_duration_seconds",
101                help: "Time spent closing Iceberg DeltaWriters in seconds",
102                var_labels: ["sink_id", "worker_id"],
103                buckets: histogram_seconds_buckets(0.001, 32.0),
104            )),
105        }
106    }
107}
108
109pub(crate) struct IcebergSinkMetrics {
110    /// Number of data files written by the iceberg sink.
111    pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112    /// Number of delete files written by the iceberg sink.
113    pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114    /// Number of stashed rows in the iceberg sink.
115    pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
116    /// Number of snapshots committed by the iceberg sink.
117    pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
118    /// Number of commit failures in the iceberg sink.
119    pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
120    /// Number of commit conflicts in the iceberg sink.
121    pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
122    /// Time spent committing batches to Iceberg.
123    pub commit_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
124    /// Time spent closing Iceberg DeltaWriters.
125    pub writer_close_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
126}
127
128impl IcebergSinkMetrics {
129    /// Create a `IcebergSinkMetrics` from the `IcebergSinkMetricDefs`.
130    pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
131        let labels = vec![sink_id.to_string(), worker_id.to_string()];
132        Self {
133            data_files_written: defs
134                .data_files_written
135                .get_delete_on_drop_metric(labels.clone()),
136            delete_files_written: defs
137                .delete_files_written
138                .get_delete_on_drop_metric(labels.clone()),
139            stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
140            snapshots_committed: defs
141                .snapshots_committed
142                .get_delete_on_drop_metric(labels.clone()),
143            commit_failures: defs
144                .commit_failures
145                .get_delete_on_drop_metric(labels.clone()),
146            commit_conflicts: defs
147                .commit_conflicts
148                .get_delete_on_drop_metric(labels.clone()),
149            commit_duration_seconds: defs
150                .commit_duration_seconds
151                .get_delete_on_drop_metric(labels.clone()),
152            writer_close_duration_seconds: defs
153                .writer_close_duration_seconds
154                .get_delete_on_drop_metric(labels),
155        }
156    }
157}