mz_storage/metrics/sink/
iceberg.rs1use mz_ore::{
13 metric,
14 metrics::{DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, UIntGaugeVec},
15};
16use mz_repr::GlobalId;
17use prometheus::core::AtomicU64;
18
19#[derive(Debug, Clone)]
20pub(crate) struct IcebergSinkMetricDefs {
21 pub data_files_written: IntCounterVec,
23 pub delete_files_written: IntCounterVec,
25 pub stashed_rows: UIntGaugeVec,
27 pub snapshots_committed: IntCounterVec,
29 pub commit_failures: IntCounterVec,
31 pub commit_conflicts: IntCounterVec,
33}
34
35impl IcebergSinkMetricDefs {
36 pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
42 Self {
43 data_files_written: registry.register(metric!(
44 name: "sink_iceberg_data_files_written",
45 help: "Number of data files written by the iceberg sink",
46 var_labels: ["sink_id", "worker_id"]
47 )),
48 delete_files_written: registry.register(metric!(
49 name: "sink_iceberg_delete_files_written",
50 help: "Number of delete files written by the iceberg sink",
51 var_labels: ["sink_id", "worker_id"]
52 )),
53 stashed_rows: registry.register(metric!(
54 name: "sink_iceberg_stashed_rows",
55 help: "Number of stashed rows in the iceberg sink",
56 var_labels: ["sink_id", "worker_id"]
57 )),
58 snapshots_committed: registry.register(metric!(
59 name: "sink_iceberg_snapshots_committed",
60 help: "Number of snapshots committed by the iceberg sink",
61 var_labels: ["sink_id", "worker_id"]
62 )),
63 commit_failures: registry.register(metric!(
64 name: "sink_iceberg_commit_failures",
65 help: "Number of commit failures in the iceberg sink",
66 var_labels: ["sink_id", "worker_id"]
67 )),
68 commit_conflicts: registry.register(metric!(
69 name: "sink_iceberg_commit_conflicts",
70 help: "Number of commit conflicts in the iceberg sink",
71 var_labels: ["sink_id", "worker_id"]
72 )),
73 }
74 }
75}
76
77#[derive(Clone)]
78pub(crate) struct IcebergSinkMetrics {
79 pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
81 pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
83 pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
85 pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
87 pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
89 pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
91}
92
93impl IcebergSinkMetrics {
94 pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
96 let labels = vec![sink_id.to_string(), worker_id.to_string()];
97 Self {
98 data_files_written: defs
99 .data_files_written
100 .get_delete_on_drop_metric(labels.clone()),
101 delete_files_written: defs
102 .delete_files_written
103 .get_delete_on_drop_metric(labels.clone()),
104 stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
105 snapshots_committed: defs
106 .snapshots_committed
107 .get_delete_on_drop_metric(labels.clone()),
108 commit_failures: defs
109 .commit_failures
110 .get_delete_on_drop_metric(labels.clone()),
111 commit_conflicts: defs.commit_conflicts.get_delete_on_drop_metric(labels),
112 }
113 }
114}