mz_storage/metrics/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for iceberg sinks.
11
12use mz_ore::{
13    metric,
14    metrics::{DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, UIntGaugeVec},
15};
16use mz_repr::GlobalId;
17use prometheus::core::AtomicU64;
18
19#[derive(Debug, Clone)]
20pub(crate) struct IcebergSinkMetricDefs {
21    /// Number of data files written by the iceberg sink.
22    pub data_files_written: IntCounterVec,
23    /// Number of delete files written by the iceberg sink.
24    pub delete_files_written: IntCounterVec,
25    /// Number of stashed rows in the iceberg sink.
26    pub stashed_rows: UIntGaugeVec,
27    /// Number of snapshots committed by the iceberg sink.
28    pub snapshots_committed: IntCounterVec,
29    /// Commit failures in the iceberg sink.
30    pub commit_failures: IntCounterVec,
31    /// Commit conflicts in the iceberg sink.
32    pub commit_conflicts: IntCounterVec,
33}
34
35impl IcebergSinkMetricDefs {
36    // Every metric must have a worker specific id associated with it. These are later wrapped
37    // in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
38    // would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
39    // workers may still be running, but the metrics registry will no longer record or report
40    // metrics for that `source_id`.
41    pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
42        Self {
43            data_files_written: registry.register(metric!(
44                name: "sink_iceberg_data_files_written",
45                help: "Number of data files written by the iceberg sink",
46                var_labels: ["sink_id", "worker_id"]
47            )),
48            delete_files_written: registry.register(metric!(
49                name: "sink_iceberg_delete_files_written",
50                help: "Number of delete files written by the iceberg sink",
51                var_labels: ["sink_id", "worker_id"]
52            )),
53            stashed_rows: registry.register(metric!(
54                name: "sink_iceberg_stashed_rows",
55                help: "Number of stashed rows in the iceberg sink",
56                var_labels: ["sink_id", "worker_id"]
57            )),
58            snapshots_committed: registry.register(metric!(
59                name: "sink_iceberg_snapshots_committed",
60                help: "Number of snapshots committed by the iceberg sink",
61                var_labels: ["sink_id", "worker_id"]
62            )),
63            commit_failures: registry.register(metric!(
64                name: "sink_iceberg_commit_failures",
65                help: "Number of commit failures in the iceberg sink",
66                var_labels: ["sink_id", "worker_id"]
67            )),
68            commit_conflicts: registry.register(metric!(
69                name: "sink_iceberg_commit_conflicts",
70                help: "Number of commit conflicts in the iceberg sink",
71                var_labels: ["sink_id", "worker_id"]
72            )),
73        }
74    }
75}
76
77#[derive(Clone)]
78pub(crate) struct IcebergSinkMetrics {
79    /// Number of data files written by the iceberg sink.
80    pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
81    /// Number of delete files written by the iceberg sink.
82    pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
83    /// Number of stashed rows in the iceberg sink.
84    pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
85    /// Number of snapshots committed by the iceberg sink.
86    pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
87    /// Number of commit failures in the iceberg sink.
88    pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
89    /// Number of commit conflicts in the iceberg sink.
90    pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
91}
92
93impl IcebergSinkMetrics {
94    /// Create a `IcebergSinkMetrics` from the `IcebergSinkMetricDefs`.
95    pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
96        let labels = vec![sink_id.to_string(), worker_id.to_string()];
97        Self {
98            data_files_written: defs
99                .data_files_written
100                .get_delete_on_drop_metric(labels.clone()),
101            delete_files_written: defs
102                .delete_files_written
103                .get_delete_on_drop_metric(labels.clone()),
104            stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
105            snapshots_committed: defs
106                .snapshots_committed
107                .get_delete_on_drop_metric(labels.clone()),
108            commit_failures: defs
109                .commit_failures
110                .get_delete_on_drop_metric(labels.clone()),
111            commit_conflicts: defs.commit_conflicts.get_delete_on_drop_metric(labels),
112        }
113    }
114}