mz_storage/metrics/sink/
iceberg.rs1use mz_ore::{
13 metric,
14 metrics::{
15 DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec, IntCounterVec,
16 UIntGaugeVec,
17 },
18 stats::histogram_seconds_buckets,
19};
20use mz_repr::GlobalId;
21use prometheus::core::AtomicU64;
22
23#[derive(Debug, Clone)]
24pub(crate) struct IcebergSinkMetricDefs {
25 pub data_files_written: IntCounterVec,
27 pub delete_files_written: IntCounterVec,
29 pub stashed_rows: UIntGaugeVec,
31 pub snapshots_committed: IntCounterVec,
33 pub commit_failures: IntCounterVec,
35 pub commit_conflicts: IntCounterVec,
37 pub commit_duration_seconds: HistogramVec,
39 pub writer_close_duration_seconds: HistogramVec,
41}
42
43impl IcebergSinkMetricDefs {
44 pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
50 Self {
51 data_files_written: registry.register(metric!(
52 name: "mz_sink_iceberg_data_files_written",
53 help: "Number of data files written by the iceberg sink",
54 var_labels: ["sink_id", "worker_id"]
55 )),
56 delete_files_written: registry.register(metric!(
57 name: "mz_sink_iceberg_delete_files_written",
58 help: "Number of delete files written by the iceberg sink",
59 var_labels: ["sink_id", "worker_id"]
60 )),
61 stashed_rows: registry.register(metric!(
62 name: "mz_sink_iceberg_stashed_rows",
63 help: "Number of stashed rows in the iceberg sink",
64 var_labels: ["sink_id", "worker_id"]
65 )),
66 snapshots_committed: registry.register(metric!(
67 name: "mz_sink_iceberg_snapshots_committed",
68 help: "Number of snapshots committed by the iceberg sink",
69 var_labels: ["sink_id", "worker_id"]
70 )),
71 commit_failures: registry.register(metric!(
72 name: "mz_sink_iceberg_commit_failures",
73 help: "Number of commit failures in the iceberg sink",
74 var_labels: ["sink_id", "worker_id"]
75 )),
76 commit_conflicts: registry.register(metric!(
77 name: "mz_sink_iceberg_commit_conflicts",
78 help: "Number of commit conflicts in the iceberg sink",
79 var_labels: ["sink_id", "worker_id"]
80 )),
81 commit_duration_seconds: registry.register(metric!(
82 name: "mz_sink_iceberg_commit_duration_seconds",
83 help: "Time spent committing batches to Iceberg in seconds",
84 var_labels: ["sink_id", "worker_id"],
85 buckets: histogram_seconds_buckets(0.001, 32.0),
86 )),
87 writer_close_duration_seconds: registry.register(metric!(
88 name: "mz_sink_iceberg_writer_close_duration_seconds",
89 help: "Time spent closing Iceberg DeltaWriters in seconds",
90 var_labels: ["sink_id", "worker_id"],
91 buckets: histogram_seconds_buckets(0.001, 32.0),
92 )),
93 }
94 }
95}
96
97pub(crate) struct IcebergSinkMetrics {
98 pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
100 pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
102 pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
104 pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
106 pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
108 pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
110 pub commit_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
112 pub writer_close_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
114}
115
116impl IcebergSinkMetrics {
117 pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
119 let labels = vec![sink_id.to_string(), worker_id.to_string()];
120 Self {
121 data_files_written: defs
122 .data_files_written
123 .get_delete_on_drop_metric(labels.clone()),
124 delete_files_written: defs
125 .delete_files_written
126 .get_delete_on_drop_metric(labels.clone()),
127 stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
128 snapshots_committed: defs
129 .snapshots_committed
130 .get_delete_on_drop_metric(labels.clone()),
131 commit_failures: defs
132 .commit_failures
133 .get_delete_on_drop_metric(labels.clone()),
134 commit_conflicts: defs
135 .commit_conflicts
136 .get_delete_on_drop_metric(labels.clone()),
137 commit_duration_seconds: defs
138 .commit_duration_seconds
139 .get_delete_on_drop_metric(labels.clone()),
140 writer_close_duration_seconds: defs
141 .writer_close_duration_seconds
142 .get_delete_on_drop_metric(labels),
143 }
144 }
145}