mz_storage/metrics/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for iceberg sinks.
11
12use mz_ore::{
13    metric,
14    metrics::{DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, UIntGaugeVec},
15};
16use mz_repr::GlobalId;
17use prometheus::core::AtomicU64;
18
19#[derive(Debug, Clone)]
20pub(crate) struct IcebergSinkMetricDefs {
21    /// Number of rows written by the iceberg sink.
22    pub rows_written: IntCounterVec,
23    /// Number of rows deleted by the iceberg sink.
24    pub rows_deleted: IntCounterVec,
25    /// Number of data files written by the iceberg sink.
26    pub data_files_written: IntCounterVec,
27    /// Number of delete files written by the iceberg sink.
28    pub delete_files_written: IntCounterVec,
29    /// Number of stashed rows in the iceberg sink.
30    pub stashed_rows: UIntGaugeVec,
31    /// Total number of bytes written in data and delete files to object storage.
32    pub bytes_written: IntCounterVec,
33    /// Number of snapshots committed by the iceberg sink.
34    pub snapshots_committed: IntCounterVec,
35    /// Commit failures in the iceberg sink.
36    pub commit_failures: IntCounterVec,
37    /// Commit conflicts in the iceberg sink.
38    pub commit_conflicts: IntCounterVec,
39}
40
41impl IcebergSinkMetricDefs {
42    // Every metric must have a worker specific id associated with it. These are later wrapped
43    // in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
44    // would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
45    // workers may still be running, but the metrics registry will no longer record or report
46    // metrics for that `source_id`.
47    pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
48        Self {
49            rows_written: registry.register(metric!(
50                name: "sink_iceberg_rows_written",
51                help: "Number of rows written by the iceberg sink",
52                var_labels: ["sink_id", "worker_id"]
53            )),
54            rows_deleted: registry.register(metric!(
55                name: "sink_iceberg_rows_deleted",
56                help: "Number of rows deleted by the iceberg sink",
57                var_labels: ["sink_id", "worker_id"]
58            )),
59            data_files_written: registry.register(metric!(
60                name: "sink_iceberg_data_files_written",
61                help: "Number of data files written by the iceberg sink",
62                var_labels: ["sink_id", "worker_id"]
63            )),
64            delete_files_written: registry.register(metric!(
65                name: "sink_iceberg_delete_files_written",
66                help: "Number of delete files written by the iceberg sink",
67                var_labels: ["sink_id", "worker_id"]
68            )),
69            stashed_rows: registry.register(metric!(
70                name: "sink_iceberg_stashed_rows",
71                help: "Number of stashed rows in the iceberg sink",
72                var_labels: ["sink_id", "worker_id"]
73            )),
74            bytes_written: registry.register(metric!(
75                name: "sink_iceberg_bytes_written",
76                help: "Number of bytes written by the iceberg sink",
77                var_labels: ["sink_id", "worker_id"]
78            )),
79            snapshots_committed: registry.register(metric!(
80                name: "sink_iceberg_snapshots_committed",
81                help: "Number of snapshots committed by the iceberg sink",
82                var_labels: ["sink_id", "worker_id"]
83            )),
84            commit_failures: registry.register(metric!(
85                name: "sink_iceberg_commit_failures",
86                help: "Number of commit failures in the iceberg sink",
87                var_labels: ["sink_id", "worker_id"]
88            )),
89            commit_conflicts: registry.register(metric!(
90                name: "sink_iceberg_commit_conflicts",
91                help: "Number of commit conflicts in the iceberg sink",
92                var_labels: ["sink_id", "worker_id"]
93            )),
94        }
95    }
96}
97
98#[derive(Clone)]
99pub(crate) struct IcebergSinkMetrics {
100    /// Number of rows written by the iceberg sink.
101    pub rows_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
102    /// Number of rows deleted by the iceberg sink.
103    pub rows_deleted: DeleteOnDropCounter<AtomicU64, Vec<String>>,
104    /// Number of data files written by the iceberg sink.
105    pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
106    /// Number of delete files written by the iceberg sink.
107    pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
108    /// Number of stashed rows in the iceberg sink.
109    pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
110    /// Number of bytes written by the iceberg sink.
111    pub bytes_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112    /// Number of snapshots committed by the iceberg sink.
113    pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114    /// Number of commit failures in the iceberg sink.
115    pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
116    /// Number of commit conflicts in the iceberg sink.
117    pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
118}
119
120impl IcebergSinkMetrics {
121    /// Create a `IcebergSinkMetrics` from the `IcebergSinkMetricDefs`.
122    pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
123        let labels = vec![sink_id.to_string(), worker_id.to_string()];
124        Self {
125            rows_written: defs.rows_written.get_delete_on_drop_metric(labels.clone()),
126            rows_deleted: defs.rows_deleted.get_delete_on_drop_metric(labels.clone()),
127            data_files_written: defs
128                .data_files_written
129                .get_delete_on_drop_metric(labels.clone()),
130            delete_files_written: defs
131                .delete_files_written
132                .get_delete_on_drop_metric(labels.clone()),
133            stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
134            bytes_written: defs.bytes_written.get_delete_on_drop_metric(labels.clone()),
135            snapshots_committed: defs
136                .snapshots_committed
137                .get_delete_on_drop_metric(labels.clone()),
138            commit_failures: defs
139                .commit_failures
140                .get_delete_on_drop_metric(labels.clone()),
141            commit_conflicts: defs.commit_conflicts.get_delete_on_drop_metric(labels),
142        }
143    }
144}