1use std::fmt::Debug;
19use std::time::Instant;
20
21use bytes::Bytes;
22use lgalloc::AllocError;
23use prometheus::{Counter, CounterVec, Histogram, IntCounter, IntCounterVec};
24use tracing::debug;
25
26use crate::cast::{CastFrom, CastLossy};
27use crate::metric;
28use crate::metrics::MetricsRegistry;
29use crate::region::Region;
30
31impl From<MetricsRegion<u8>> for Bytes {
32 fn from(bytes: MetricsRegion<u8>) -> Bytes {
33 Bytes::from_owner(bytes)
36 }
37}
38
39pub struct MetricsRegion<T: Copy> {
44 buf: Region<T>,
45 free_count: IntCounter,
46 free_capacity_bytes: IntCounter,
47}
48
49impl<T: Copy + Debug> Debug for MetricsRegion<T> {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 std::fmt::Debug::fmt(self.buf.as_vec(), f)
52 }
53}
54
55impl<T: Copy> MetricsRegion<T> {
56 fn capacity_bytes(&self) -> usize {
57 self.buf.capacity() * std::mem::size_of::<T>()
58 }
59
60 pub fn extend_from_slice(&mut self, slice: &[T]) {
66 self.buf.extend_from_slice(slice);
67 }
68}
69
70impl<T: Copy + PartialEq> PartialEq for MetricsRegion<T> {
71 fn eq(&self, other: &Self) -> bool {
72 self.buf.as_vec() == other.buf.as_vec()
73 }
74}
75
76impl<T: Copy + Eq> Eq for MetricsRegion<T> {}
77
78impl<T: Copy> Drop for MetricsRegion<T> {
79 fn drop(&mut self) {
80 self.free_count.inc();
81 self.free_capacity_bytes
82 .inc_by(u64::cast_from(self.capacity_bytes()));
83 }
84}
85
86impl<T: Copy> AsRef<[T]> for MetricsRegion<T> {
87 fn as_ref(&self) -> &[T] {
88 &self.buf[..]
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct LgBytesMetrics {
95 pub persist_s3: LgBytesOpMetrics,
97 pub persist_azure: LgBytesOpMetrics,
99 pub persist_arrow: LgBytesOpMetrics,
101}
102
103#[derive(Clone)]
105pub struct LgBytesOpMetrics {
106 heap: LgBytesRegionMetrics,
107 mmap: LgBytesRegionMetrics,
108 alloc_seconds: Counter,
109 mmap_disabled_count: IntCounter,
110 mmap_error_count: IntCounter,
111 len_sizes: Histogram,
114}
115
116impl std::fmt::Debug for LgBytesOpMetrics {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("LgBytesOperationMetrics")
119 .finish_non_exhaustive()
120 }
121}
122
123#[derive(Clone)]
124struct LgBytesRegionMetrics {
125 alloc_count: IntCounter,
126 alloc_capacity_bytes: IntCounter,
127 free_count: IntCounter,
128 free_capacity_bytes: IntCounter,
129}
130
131impl LgBytesMetrics {
132 pub fn new(registry: &MetricsRegistry) -> Self {
134 let alloc_count: IntCounterVec = registry.register(metric!(
135 name: "mz_lgbytes_alloc_count",
136 help: "count of LgBytes allocations",
137 var_labels: ["op", "region"],
138 ));
139 let alloc_capacity_bytes: IntCounterVec = registry.register(metric!(
140 name: "mz_lgbytes_alloc_capacity_bytes",
141 help: "total capacity bytes of LgBytes allocations",
142 var_labels: ["op", "region"],
143 ));
144 let free_count: IntCounterVec = registry.register(metric!(
145 name: "mz_lgbytes_free_count",
146 help: "count of LgBytes frees",
147 var_labels: ["op", "region"],
148 ));
149 let free_capacity_bytes: IntCounterVec = registry.register(metric!(
150 name: "mz_lgbytes_free_capacity_bytes",
151 help: "total capacity bytes of LgBytes frees",
152 var_labels: ["op", "region"],
153 ));
154 let alloc_seconds: CounterVec = registry.register(metric!(
155 name: "mz_lgbytes_alloc_seconds",
156 help: "seconds spent getting LgBytes allocations and copying in data",
157 var_labels: ["op"],
158 ));
159 let mmap_disabled_count: IntCounter = registry.register(metric!(
160 name: "mz_bytes_mmap_disabled_count",
161 help: "count alloc attempts with lgalloc disabled",
162 ));
163 let mmap_error_count: IntCounter = registry.register(metric!(
164 name: "mz_bytes_mmap_error_count",
165 help: "count of errors when attempting file-based mapped alloc",
166 ));
167 let len_sizes: Histogram = registry.register(metric!(
168 name: "mz_bytes_alloc_len_sizes",
169 help: "histogram of LgBytes alloc len sizes",
170 buckets: crate::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
171 ));
172 let op = |name: &str| LgBytesOpMetrics {
173 heap: LgBytesRegionMetrics {
174 alloc_count: alloc_count.with_label_values(&[name, "heap"]),
175 alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "heap"]),
176 free_count: free_count.with_label_values(&[name, "heap"]),
177 free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "heap"]),
178 },
179 mmap: LgBytesRegionMetrics {
180 alloc_count: alloc_count.with_label_values(&[name, "mmap"]),
181 alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "mmap"]),
182 free_count: free_count.with_label_values(&[name, "mmap"]),
183 free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "mmap"]),
184 },
185 alloc_seconds: alloc_seconds.with_label_values(&[name]),
186 mmap_disabled_count: mmap_disabled_count.clone(),
187 mmap_error_count: mmap_error_count.clone(),
188 len_sizes: len_sizes.clone(),
189 };
190 LgBytesMetrics {
191 persist_s3: op("persist_s3"),
192 persist_azure: op("persist_azure"),
193 persist_arrow: op("persist_arrow"),
194 }
195 }
196}
197
198impl LgBytesOpMetrics {
199 pub fn new_region<T: Copy>(&self, capacity: usize) -> MetricsRegion<T> {
201 let start = Instant::now();
202
203 let capacity = std::cmp::max(capacity, 1 << lgalloc::VALID_SIZE_CLASS.start);
205 let region = match Region::new_mmap(capacity) {
206 Ok(region) => region,
207 Err(err) => {
208 if let AllocError::Disabled = err {
209 self.mmap_disabled_count.inc()
210 } else {
211 debug!("failed to mmap allocate: {}", err);
212 self.mmap_error_count.inc();
213 }
214 Region::new_heap(capacity)
215 }
216 };
217 let region = self.metrics_region(region);
218 self.alloc_seconds.inc_by(start.elapsed().as_secs_f64());
219
220 region
221 }
222
223 pub fn try_mmap_bytes(&self, buf: Bytes) -> Bytes {
226 self.try_mmap_region(buf.as_ref())
227 .map(Bytes::from)
228 .unwrap_or(buf)
229 }
230
231 pub fn try_mmap_region<T: Copy>(
233 &self,
234 buf: impl AsRef<[T]>,
235 ) -> Result<MetricsRegion<T>, AllocError> {
236 let start = Instant::now();
237 let buf = buf.as_ref();
238 let capacity = std::cmp::max(buf.len(), 1 << lgalloc::VALID_SIZE_CLASS.start);
240 let buf = match Region::new_mmap(capacity) {
241 Ok(mut region) => {
242 region.extend_from_slice(buf);
243 Ok(region)
244 }
245 Err(err) => {
246 match &err {
247 AllocError::Disabled => self.mmap_disabled_count.inc(),
248 err => {
249 debug!("failed to mmap allocate: {}", err);
250 self.mmap_error_count.inc();
251 }
252 };
253 Err(err)
254 }
255 }?;
256 let region = self.metrics_region(buf);
257 self.alloc_seconds.inc_by(start.elapsed().as_secs_f64());
258 Ok(region)
259 }
260
261 pub fn heap_region<T: Copy>(&self, buf: Vec<T>) -> MetricsRegion<T> {
265 self.metrics_region(Region::Heap(buf))
267 }
268
269 fn metrics_region<T: Copy>(&self, buf: Region<T>) -> MetricsRegion<T> {
270 let metrics = match buf {
271 Region::MMap(_) => &self.mmap,
272 Region::Heap(_) => &self.heap,
273 };
274 let region = MetricsRegion {
275 buf,
276 free_count: metrics.free_count.clone(),
277 free_capacity_bytes: metrics.free_capacity_bytes.clone(),
278 };
279 metrics.alloc_count.inc();
280 metrics
281 .alloc_capacity_bytes
282 .inc_by(u64::cast_from(region.capacity_bytes()));
283 let len_bytes = region.buf.len() * std::mem::size_of::<T>();
284 self.len_sizes.observe(f64::cast_lossy(len_bytes));
285 region
286 }
287}