mz_storage/metrics/sink/
iceberg.rs1use mz_ore::{
13 metric,
14 metrics::{DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, UIntGaugeVec},
15};
16use mz_repr::GlobalId;
17use prometheus::core::AtomicU64;
18
19#[derive(Debug, Clone)]
20pub(crate) struct IcebergSinkMetricDefs {
21 pub rows_written: IntCounterVec,
23 pub rows_deleted: IntCounterVec,
25 pub data_files_written: IntCounterVec,
27 pub delete_files_written: IntCounterVec,
29 pub stashed_rows: UIntGaugeVec,
31 pub bytes_written: IntCounterVec,
33 pub snapshots_committed: IntCounterVec,
35 pub commit_failures: IntCounterVec,
37 pub commit_conflicts: IntCounterVec,
39}
40
41impl IcebergSinkMetricDefs {
42 pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
48 Self {
49 rows_written: registry.register(metric!(
50 name: "sink_iceberg_rows_written",
51 help: "Number of rows written by the iceberg sink",
52 var_labels: ["sink_id", "worker_id"]
53 )),
54 rows_deleted: registry.register(metric!(
55 name: "sink_iceberg_rows_deleted",
56 help: "Number of rows deleted by the iceberg sink",
57 var_labels: ["sink_id", "worker_id"]
58 )),
59 data_files_written: registry.register(metric!(
60 name: "sink_iceberg_data_files_written",
61 help: "Number of data files written by the iceberg sink",
62 var_labels: ["sink_id", "worker_id"]
63 )),
64 delete_files_written: registry.register(metric!(
65 name: "sink_iceberg_delete_files_written",
66 help: "Number of delete files written by the iceberg sink",
67 var_labels: ["sink_id", "worker_id"]
68 )),
69 stashed_rows: registry.register(metric!(
70 name: "sink_iceberg_stashed_rows",
71 help: "Number of stashed rows in the iceberg sink",
72 var_labels: ["sink_id", "worker_id"]
73 )),
74 bytes_written: registry.register(metric!(
75 name: "sink_iceberg_bytes_written",
76 help: "Number of bytes written by the iceberg sink",
77 var_labels: ["sink_id", "worker_id"]
78 )),
79 snapshots_committed: registry.register(metric!(
80 name: "sink_iceberg_snapshots_committed",
81 help: "Number of snapshots committed by the iceberg sink",
82 var_labels: ["sink_id", "worker_id"]
83 )),
84 commit_failures: registry.register(metric!(
85 name: "sink_iceberg_commit_failures",
86 help: "Number of commit failures in the iceberg sink",
87 var_labels: ["sink_id", "worker_id"]
88 )),
89 commit_conflicts: registry.register(metric!(
90 name: "sink_iceberg_commit_conflicts",
91 help: "Number of commit conflicts in the iceberg sink",
92 var_labels: ["sink_id", "worker_id"]
93 )),
94 }
95 }
96}
97
98#[derive(Clone)]
99pub(crate) struct IcebergSinkMetrics {
100 pub rows_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
102 pub rows_deleted: DeleteOnDropCounter<AtomicU64, Vec<String>>,
104 pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
106 pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
108 pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
110 pub bytes_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112 pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114 pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
116 pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
118}
119
120impl IcebergSinkMetrics {
121 pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
123 let labels = vec![sink_id.to_string(), worker_id.to_string()];
124 Self {
125 rows_written: defs.rows_written.get_delete_on_drop_metric(labels.clone()),
126 rows_deleted: defs.rows_deleted.get_delete_on_drop_metric(labels.clone()),
127 data_files_written: defs
128 .data_files_written
129 .get_delete_on_drop_metric(labels.clone()),
130 delete_files_written: defs
131 .delete_files_written
132 .get_delete_on_drop_metric(labels.clone()),
133 stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
134 bytes_written: defs.bytes_written.get_delete_on_drop_metric(labels.clone()),
135 snapshots_committed: defs
136 .snapshots_committed
137 .get_delete_on_drop_metric(labels.clone()),
138 commit_failures: defs
139 .commit_failures
140 .get_delete_on_drop_metric(labels.clone()),
141 commit_conflicts: defs.commit_conflicts.get_delete_on_drop_metric(labels),
142 }
143 }
144}