1use mz_ore::{
13 metric,
14 metrics::{
15 DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec, IntCounterVec,
16 MetricVisibility, UIntGaugeVec,
17 },
18 stats::histogram_seconds_buckets,
19};
20use mz_repr::GlobalId;
21use prometheus::core::AtomicU64;
22
23#[derive(Debug, Clone)]
24pub(crate) struct IcebergSinkMetricDefs {
25 pub data_files_written: IntCounterVec,
27 pub delete_files_written: IntCounterVec,
29 pub stashed_rows: UIntGaugeVec,
31 pub snapshots_committed: IntCounterVec,
33 pub commit_failures: IntCounterVec,
35 pub commit_conflicts: IntCounterVec,
37 pub commit_duration_seconds: HistogramVec,
39 pub writer_close_duration_seconds: HistogramVec,
41}
42
43impl IcebergSinkMetricDefs {
44 pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
50 Self {
51 data_files_written: registry.register(metric!(
52 name: "mz_sink_iceberg_data_files_written",
53 help: "Number of data files written by the iceberg sink",
54 var_labels: ["sink_id", "worker_id"],
55 visibility: MetricVisibility::Public,
56 )),
57 delete_files_written: registry.register(metric!(
58 name: "mz_sink_iceberg_delete_files_written",
59 help: "Number of delete files written by the iceberg sink",
60 var_labels: ["sink_id", "worker_id"],
61 visibility: MetricVisibility::Public,
62 )),
63 stashed_rows: registry.register(metric!(
64 name: "mz_sink_iceberg_stashed_rows",
65 help: "Number of stashed rows in the iceberg sink",
66 var_labels: ["sink_id", "worker_id"]
67 )),
68 snapshots_committed: registry.register(metric!(
69 name: "mz_sink_iceberg_snapshots_committed",
70 help: "Number of snapshots committed by the iceberg sink",
71 var_labels: ["sink_id", "worker_id"],
72 visibility: MetricVisibility::Public,
73 )),
74 commit_failures: registry.register(metric!(
75 name: "mz_sink_iceberg_commit_failures",
76 help: "Number of commit failures in the iceberg sink",
77 var_labels: ["sink_id", "worker_id"],
78 visibility: MetricVisibility::Public,
79 )),
80 commit_conflicts: registry.register(metric!(
81 name: "mz_sink_iceberg_commit_conflicts",
82 help: "Number of commit conflicts in the iceberg sink",
83 var_labels: ["sink_id", "worker_id"],
84 visibility: MetricVisibility::Public,
85 )),
86 commit_duration_seconds: registry.register(metric!(
87 name: "mz_sink_iceberg_commit_duration_seconds",
88 help: "Time spent committing batches to Iceberg in seconds",
89 var_labels: ["sink_id", "worker_id"],
90 buckets: histogram_seconds_buckets(0.001, 32.0),
91 visibility: MetricVisibility::Public,
92 )),
93 writer_close_duration_seconds: registry.register(metric!(
94 name: "mz_sink_iceberg_writer_close_duration_seconds",
95 help: "Time spent closing Iceberg DeltaWriters in seconds",
96 var_labels: ["sink_id", "worker_id"],
97 buckets: histogram_seconds_buckets(0.001, 32.0),
98 )),
99 }
100 }
101}
102
103pub(crate) struct IcebergSinkMetrics {
104 pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
106 pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
108 pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
110 pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112 pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114 pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
116 pub commit_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
118 pub writer_close_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
120}
121
122impl IcebergSinkMetrics {
123 pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
125 let labels = vec![sink_id.to_string(), worker_id.to_string()];
126 Self {
127 data_files_written: defs
128 .data_files_written
129 .get_delete_on_drop_metric(labels.clone()),
130 delete_files_written: defs
131 .delete_files_written
132 .get_delete_on_drop_metric(labels.clone()),
133 stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
134 snapshots_committed: defs
135 .snapshots_committed
136 .get_delete_on_drop_metric(labels.clone()),
137 commit_failures: defs
138 .commit_failures
139 .get_delete_on_drop_metric(labels.clone()),
140 commit_conflicts: defs
141 .commit_conflicts
142 .get_delete_on_drop_metric(labels.clone()),
143 commit_duration_seconds: defs
144 .commit_duration_seconds
145 .get_delete_on_drop_metric(labels.clone()),
146 writer_close_duration_seconds: defs
147 .writer_close_duration_seconds
148 .get_delete_on_drop_metric(labels),
149 }
150 }
151}