1use mz_ore::{
13 metric,
14 metrics::{
15 DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec, IntCounterVec,
16 MetricTag, MetricVisibility, UIntGaugeVec,
17 },
18 stats::histogram_seconds_buckets,
19};
20use mz_repr::GlobalId;
21use prometheus::core::AtomicU64;
22
23#[derive(Debug, Clone)]
24pub(crate) struct IcebergSinkMetricDefs {
25 pub data_files_written: IntCounterVec,
27 pub delete_files_written: IntCounterVec,
29 pub stashed_rows: UIntGaugeVec,
31 pub snapshots_committed: IntCounterVec,
33 pub commit_failures: IntCounterVec,
35 pub commit_conflicts: IntCounterVec,
37 pub commit_duration_seconds: HistogramVec,
39 pub writer_close_duration_seconds: HistogramVec,
41}
42
43impl IcebergSinkMetricDefs {
44 pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
50 Self {
51 data_files_written: registry.register(metric!(
52 name: "mz_sink_iceberg_data_files_written",
53 help: "Number of data files written by the iceberg sink",
54 var_labels: ["sink_id", "worker_id"],
55 visibility: MetricVisibility::Public,
56 tags: [MetricTag::Sink],
57 )),
58 delete_files_written: registry.register(metric!(
59 name: "mz_sink_iceberg_delete_files_written",
60 help: "Number of delete files written by the iceberg sink",
61 var_labels: ["sink_id", "worker_id"],
62 visibility: MetricVisibility::Public,
63 tags: [MetricTag::Sink],
64 )),
65 stashed_rows: registry.register(metric!(
66 name: "mz_sink_iceberg_stashed_rows",
67 help: "Number of stashed rows in the iceberg sink",
68 var_labels: ["sink_id", "worker_id"]
69 )),
70 snapshots_committed: registry.register(metric!(
71 name: "mz_sink_iceberg_snapshots_committed",
72 help: "Number of snapshots committed by the iceberg sink",
73 var_labels: ["sink_id", "worker_id"],
74 visibility: MetricVisibility::Public,
75 tags: [MetricTag::Sink],
76 )),
77 commit_failures: registry.register(metric!(
78 name: "mz_sink_iceberg_commit_failures",
79 help: "Number of commit failures in the iceberg sink",
80 var_labels: ["sink_id", "worker_id"],
81 visibility: MetricVisibility::Public,
82 tags: [MetricTag::Sink],
83 )),
84 commit_conflicts: registry.register(metric!(
85 name: "mz_sink_iceberg_commit_conflicts",
86 help: "Number of commit conflicts in the iceberg sink",
87 var_labels: ["sink_id", "worker_id"],
88 visibility: MetricVisibility::Public,
89 tags: [MetricTag::Sink],
90 )),
91 commit_duration_seconds: registry.register(metric!(
92 name: "mz_sink_iceberg_commit_duration_seconds",
93 help: "Time spent committing batches to Iceberg in seconds",
94 var_labels: ["sink_id", "worker_id"],
95 buckets: histogram_seconds_buckets(0.001, 32.0),
96 visibility: MetricVisibility::Public,
97 tags: [MetricTag::Sink],
98 )),
99 writer_close_duration_seconds: registry.register(metric!(
100 name: "mz_sink_iceberg_writer_close_duration_seconds",
101 help: "Time spent closing Iceberg DeltaWriters in seconds",
102 var_labels: ["sink_id", "worker_id"],
103 buckets: histogram_seconds_buckets(0.001, 32.0),
104 )),
105 }
106 }
107}
108
109pub(crate) struct IcebergSinkMetrics {
110 pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112 pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114 pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
116 pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
118 pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
120 pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
122 pub commit_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
124 pub writer_close_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
126}
127
128impl IcebergSinkMetrics {
129 pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
131 let labels = vec![sink_id.to_string(), worker_id.to_string()];
132 Self {
133 data_files_written: defs
134 .data_files_written
135 .get_delete_on_drop_metric(labels.clone()),
136 delete_files_written: defs
137 .delete_files_written
138 .get_delete_on_drop_metric(labels.clone()),
139 stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
140 snapshots_committed: defs
141 .snapshots_committed
142 .get_delete_on_drop_metric(labels.clone()),
143 commit_failures: defs
144 .commit_failures
145 .get_delete_on_drop_metric(labels.clone()),
146 commit_conflicts: defs
147 .commit_conflicts
148 .get_delete_on_drop_metric(labels.clone()),
149 commit_duration_seconds: defs
150 .commit_duration_seconds
151 .get_delete_on_drop_metric(labels.clone()),
152 writer_close_duration_seconds: defs
153 .writer_close_duration_seconds
154 .get_delete_on_drop_metric(labels),
155 }
156 }
157}