Skip to main content

mz_storage/metrics/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for iceberg sinks.
11
12use mz_ore::{
13    metric,
14    metrics::{
15        DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec, IntCounterVec,
16        UIntGaugeVec,
17    },
18    stats::histogram_seconds_buckets,
19};
20use mz_repr::GlobalId;
21use prometheus::core::AtomicU64;
22
23#[derive(Debug, Clone)]
24pub(crate) struct IcebergSinkMetricDefs {
25    /// Number of data files written by the iceberg sink.
26    pub data_files_written: IntCounterVec,
27    /// Number of delete files written by the iceberg sink.
28    pub delete_files_written: IntCounterVec,
29    /// Number of stashed rows in the iceberg sink.
30    pub stashed_rows: UIntGaugeVec,
31    /// Number of snapshots committed by the iceberg sink.
32    pub snapshots_committed: IntCounterVec,
33    /// Commit failures in the iceberg sink.
34    pub commit_failures: IntCounterVec,
35    /// Commit conflicts in the iceberg sink.
36    pub commit_conflicts: IntCounterVec,
37    /// Time spent committing batches to Iceberg.
38    pub commit_duration_seconds: HistogramVec,
39    /// Time spent closing Iceberg DeltaWriters.
40    pub writer_close_duration_seconds: HistogramVec,
41}
42
43impl IcebergSinkMetricDefs {
44    // Every metric must have a worker specific id associated with it. These are later wrapped
45    // in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
46    // would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
47    // workers may still be running, but the metrics registry will no longer record or report
48    // metrics for that `source_id`.
49    pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
50        Self {
51            data_files_written: registry.register(metric!(
52                name: "mz_sink_iceberg_data_files_written",
53                help: "Number of data files written by the iceberg sink",
54                var_labels: ["sink_id", "worker_id"]
55            )),
56            delete_files_written: registry.register(metric!(
57                name: "mz_sink_iceberg_delete_files_written",
58                help: "Number of delete files written by the iceberg sink",
59                var_labels: ["sink_id", "worker_id"]
60            )),
61            stashed_rows: registry.register(metric!(
62                name: "mz_sink_iceberg_stashed_rows",
63                help: "Number of stashed rows in the iceberg sink",
64                var_labels: ["sink_id", "worker_id"]
65            )),
66            snapshots_committed: registry.register(metric!(
67                name: "mz_sink_iceberg_snapshots_committed",
68                help: "Number of snapshots committed by the iceberg sink",
69                var_labels: ["sink_id", "worker_id"]
70            )),
71            commit_failures: registry.register(metric!(
72                name: "mz_sink_iceberg_commit_failures",
73                help: "Number of commit failures in the iceberg sink",
74                var_labels: ["sink_id", "worker_id"]
75            )),
76            commit_conflicts: registry.register(metric!(
77                name: "mz_sink_iceberg_commit_conflicts",
78                help: "Number of commit conflicts in the iceberg sink",
79                var_labels: ["sink_id", "worker_id"]
80            )),
81            commit_duration_seconds: registry.register(metric!(
82                name: "mz_sink_iceberg_commit_duration_seconds",
83                help: "Time spent committing batches to Iceberg in seconds",
84                var_labels: ["sink_id", "worker_id"],
85                buckets: histogram_seconds_buckets(0.001, 32.0),
86            )),
87            writer_close_duration_seconds: registry.register(metric!(
88                name: "mz_sink_iceberg_writer_close_duration_seconds",
89                help: "Time spent closing Iceberg DeltaWriters in seconds",
90                var_labels: ["sink_id", "worker_id"],
91                buckets: histogram_seconds_buckets(0.001, 32.0),
92            )),
93        }
94    }
95}
96
97pub(crate) struct IcebergSinkMetrics {
98    /// Number of data files written by the iceberg sink.
99    pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
100    /// Number of delete files written by the iceberg sink.
101    pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
102    /// Number of stashed rows in the iceberg sink.
103    pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
104    /// Number of snapshots committed by the iceberg sink.
105    pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
106    /// Number of commit failures in the iceberg sink.
107    pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
108    /// Number of commit conflicts in the iceberg sink.
109    pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
110    /// Time spent committing batches to Iceberg.
111    pub commit_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
112    /// Time spent closing Iceberg DeltaWriters.
113    pub writer_close_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
114}
115
116impl IcebergSinkMetrics {
117    /// Create a `IcebergSinkMetrics` from the `IcebergSinkMetricDefs`.
118    pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
119        let labels = vec![sink_id.to_string(), worker_id.to_string()];
120        Self {
121            data_files_written: defs
122                .data_files_written
123                .get_delete_on_drop_metric(labels.clone()),
124            delete_files_written: defs
125                .delete_files_written
126                .get_delete_on_drop_metric(labels.clone()),
127            stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
128            snapshots_committed: defs
129                .snapshots_committed
130                .get_delete_on_drop_metric(labels.clone()),
131            commit_failures: defs
132                .commit_failures
133                .get_delete_on_drop_metric(labels.clone()),
134            commit_conflicts: defs
135                .commit_conflicts
136                .get_delete_on_drop_metric(labels.clone()),
137            commit_duration_seconds: defs
138                .commit_duration_seconds
139                .get_delete_on_drop_metric(labels.clone()),
140            writer_close_duration_seconds: defs
141                .writer_close_duration_seconds
142                .get_delete_on_drop_metric(labels),
143        }
144    }
145}