Skip to main content

mz_storage/metrics/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for iceberg sinks.
11
12use mz_ore::{
13    metric,
14    metrics::{
15        DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec, IntCounterVec,
16        MetricVisibility, UIntGaugeVec,
17    },
18    stats::histogram_seconds_buckets,
19};
20use mz_repr::GlobalId;
21use prometheus::core::AtomicU64;
22
23#[derive(Debug, Clone)]
24pub(crate) struct IcebergSinkMetricDefs {
25    /// Number of data files written by the iceberg sink.
26    pub data_files_written: IntCounterVec,
27    /// Number of delete files written by the iceberg sink.
28    pub delete_files_written: IntCounterVec,
29    /// Number of stashed rows in the iceberg sink.
30    pub stashed_rows: UIntGaugeVec,
31    /// Number of snapshots committed by the iceberg sink.
32    pub snapshots_committed: IntCounterVec,
33    /// Commit failures in the iceberg sink.
34    pub commit_failures: IntCounterVec,
35    /// Commit conflicts in the iceberg sink.
36    pub commit_conflicts: IntCounterVec,
37    /// Time spent committing batches to Iceberg.
38    pub commit_duration_seconds: HistogramVec,
39    /// Time spent closing Iceberg DeltaWriters.
40    pub writer_close_duration_seconds: HistogramVec,
41}
42
43impl IcebergSinkMetricDefs {
44    // Every metric must have a worker specific id associated with it. These are later wrapped
45    // in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
46    // would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
47    // workers may still be running, but the metrics registry will no longer record or report
48    // metrics for that `source_id`.
49    pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
50        Self {
51            data_files_written: registry.register(metric!(
52                name: "mz_sink_iceberg_data_files_written",
53                help: "Number of data files written by the iceberg sink",
54                var_labels: ["sink_id", "worker_id"],
55                visibility: MetricVisibility::Public,
56            )),
57            delete_files_written: registry.register(metric!(
58                name: "mz_sink_iceberg_delete_files_written",
59                help: "Number of delete files written by the iceberg sink",
60                var_labels: ["sink_id", "worker_id"],
61                visibility: MetricVisibility::Public,
62            )),
63            stashed_rows: registry.register(metric!(
64                name: "mz_sink_iceberg_stashed_rows",
65                help: "Number of stashed rows in the iceberg sink",
66                var_labels: ["sink_id", "worker_id"]
67            )),
68            snapshots_committed: registry.register(metric!(
69                name: "mz_sink_iceberg_snapshots_committed",
70                help: "Number of snapshots committed by the iceberg sink",
71                var_labels: ["sink_id", "worker_id"],
72                visibility: MetricVisibility::Public,
73            )),
74            commit_failures: registry.register(metric!(
75                name: "mz_sink_iceberg_commit_failures",
76                help: "Number of commit failures in the iceberg sink",
77                var_labels: ["sink_id", "worker_id"],
78                visibility: MetricVisibility::Public,
79            )),
80            commit_conflicts: registry.register(metric!(
81                name: "mz_sink_iceberg_commit_conflicts",
82                help: "Number of commit conflicts in the iceberg sink",
83                var_labels: ["sink_id", "worker_id"],
84                visibility: MetricVisibility::Public,
85            )),
86            commit_duration_seconds: registry.register(metric!(
87                name: "mz_sink_iceberg_commit_duration_seconds",
88                help: "Time spent committing batches to Iceberg in seconds",
89                var_labels: ["sink_id", "worker_id"],
90                buckets: histogram_seconds_buckets(0.001, 32.0),
91                visibility: MetricVisibility::Public,
92            )),
93            writer_close_duration_seconds: registry.register(metric!(
94                name: "mz_sink_iceberg_writer_close_duration_seconds",
95                help: "Time spent closing Iceberg DeltaWriters in seconds",
96                var_labels: ["sink_id", "worker_id"],
97                buckets: histogram_seconds_buckets(0.001, 32.0),
98            )),
99        }
100    }
101}
102
103pub(crate) struct IcebergSinkMetrics {
104    /// Number of data files written by the iceberg sink.
105    pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
106    /// Number of delete files written by the iceberg sink.
107    pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
108    /// Number of stashed rows in the iceberg sink.
109    pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
110    /// Number of snapshots committed by the iceberg sink.
111    pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112    /// Number of commit failures in the iceberg sink.
113    pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114    /// Number of commit conflicts in the iceberg sink.
115    pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
116    /// Time spent committing batches to Iceberg.
117    pub commit_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
118    /// Time spent closing Iceberg DeltaWriters.
119    pub writer_close_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
120}
121
122impl IcebergSinkMetrics {
123    /// Create a `IcebergSinkMetrics` from the `IcebergSinkMetricDefs`.
124    pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
125        let labels = vec![sink_id.to_string(), worker_id.to_string()];
126        Self {
127            data_files_written: defs
128                .data_files_written
129                .get_delete_on_drop_metric(labels.clone()),
130            delete_files_written: defs
131                .delete_files_written
132                .get_delete_on_drop_metric(labels.clone()),
133            stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
134            snapshots_committed: defs
135                .snapshots_committed
136                .get_delete_on_drop_metric(labels.clone()),
137            commit_failures: defs
138                .commit_failures
139                .get_delete_on_drop_metric(labels.clone()),
140            commit_conflicts: defs
141                .commit_conflicts
142                .get_delete_on_drop_metric(labels.clone()),
143            commit_duration_seconds: defs
144                .commit_duration_seconds
145                .get_delete_on_drop_metric(labels.clone()),
146            writer_close_duration_seconds: defs
147                .writer_close_duration_seconds
148                .get_delete_on_drop_metric(labels),
149        }
150    }
151}