1use std::fmt::Debug;
19use std::time::Instant;
20
21use bytes::Bytes;
22use lgalloc::AllocError;
23use prometheus::{Counter, CounterVec, Histogram, IntCounter, IntCounterVec};
24use tracing::debug;
25
26use crate::cast::{CastFrom, CastLossy};
27use crate::metric;
28use crate::metrics::MetricsRegistry;
29use crate::region::Region;
30
31impl From<MetricsRegion<u8>> for Bytes {
32 fn from(bytes: MetricsRegion<u8>) -> Bytes {
33 Bytes::from_owner(bytes)
36 }
37}
38
39pub struct MetricsRegion<T: Copy> {
44 buf: Region<T>,
45 free_count: IntCounter,
46 free_capacity_bytes: IntCounter,
47}
48
49impl<T: Copy + Debug> Debug for MetricsRegion<T> {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 std::fmt::Debug::fmt(self.buf.as_vec(), f)
52 }
53}
54
55impl<T: Copy> MetricsRegion<T> {
56 fn capacity_bytes(&self) -> usize {
57 self.buf.capacity() * std::mem::size_of::<T>()
58 }
59
60 pub fn extend_from_slice(&mut self, slice: &[T]) {
66 self.buf.extend_from_slice(slice);
67 }
68}
69
70impl<T: Copy + PartialEq> PartialEq for MetricsRegion<T> {
71 fn eq(&self, other: &Self) -> bool {
72 self.buf.as_vec() == other.buf.as_vec()
73 }
74}
75
76impl<T: Copy + Eq> Eq for MetricsRegion<T> {}
77
78impl<T: Copy> Drop for MetricsRegion<T> {
79 fn drop(&mut self) {
80 self.free_count.inc();
81 self.free_capacity_bytes
82 .inc_by(u64::cast_from(self.capacity_bytes()));
83 }
84}
85
86impl<T: Copy> AsRef<[T]> for MetricsRegion<T> {
87 fn as_ref(&self) -> &[T] {
88 &self.buf[..]
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct LgBytesMetrics {
95 pub persist_azure: LgBytesOpMetrics,
97 pub persist_arrow: LgBytesOpMetrics,
99}
100
101#[derive(Clone)]
103pub struct LgBytesOpMetrics {
104 heap: LgBytesRegionMetrics,
105 mmap: LgBytesRegionMetrics,
106 alloc_seconds: Counter,
107 mmap_disabled_count: IntCounter,
108 mmap_error_count: IntCounter,
109 len_sizes: Histogram,
112}
113
114impl std::fmt::Debug for LgBytesOpMetrics {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("LgBytesOperationMetrics")
117 .finish_non_exhaustive()
118 }
119}
120
121#[derive(Clone)]
122struct LgBytesRegionMetrics {
123 alloc_count: IntCounter,
124 alloc_capacity_bytes: IntCounter,
125 free_count: IntCounter,
126 free_capacity_bytes: IntCounter,
127}
128
129impl LgBytesMetrics {
130 pub fn new(registry: &MetricsRegistry) -> Self {
132 let alloc_count: IntCounterVec = registry.register(metric!(
133 name: "mz_lgbytes_alloc_count",
134 help: "count of LgBytes allocations",
135 var_labels: ["op", "region"],
136 ));
137 let alloc_capacity_bytes: IntCounterVec = registry.register(metric!(
138 name: "mz_lgbytes_alloc_capacity_bytes",
139 help: "total capacity bytes of LgBytes allocations",
140 var_labels: ["op", "region"],
141 ));
142 let free_count: IntCounterVec = registry.register(metric!(
143 name: "mz_lgbytes_free_count",
144 help: "count of LgBytes frees",
145 var_labels: ["op", "region"],
146 ));
147 let free_capacity_bytes: IntCounterVec = registry.register(metric!(
148 name: "mz_lgbytes_free_capacity_bytes",
149 help: "total capacity bytes of LgBytes frees",
150 var_labels: ["op", "region"],
151 ));
152 let alloc_seconds: CounterVec = registry.register(metric!(
153 name: "mz_lgbytes_alloc_seconds",
154 help: "seconds spent getting LgBytes allocations and copying in data",
155 var_labels: ["op"],
156 ));
157 let mmap_disabled_count: IntCounter = registry.register(metric!(
158 name: "mz_bytes_mmap_disabled_count",
159 help: "count alloc attempts with lgalloc disabled",
160 ));
161 let mmap_error_count: IntCounter = registry.register(metric!(
162 name: "mz_bytes_mmap_error_count",
163 help: "count of errors when attempting file-based mapped alloc",
164 ));
165 let len_sizes: Histogram = registry.register(metric!(
166 name: "mz_bytes_alloc_len_sizes",
167 help: "histogram of LgBytes alloc len sizes",
168 buckets: crate::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
169 ));
170 let op = |name: &str| LgBytesOpMetrics {
171 heap: LgBytesRegionMetrics {
172 alloc_count: alloc_count.with_label_values(&[name, "heap"]),
173 alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "heap"]),
174 free_count: free_count.with_label_values(&[name, "heap"]),
175 free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "heap"]),
176 },
177 mmap: LgBytesRegionMetrics {
178 alloc_count: alloc_count.with_label_values(&[name, "mmap"]),
179 alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "mmap"]),
180 free_count: free_count.with_label_values(&[name, "mmap"]),
181 free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "mmap"]),
182 },
183 alloc_seconds: alloc_seconds.with_label_values(&[name]),
184 mmap_disabled_count: mmap_disabled_count.clone(),
185 mmap_error_count: mmap_error_count.clone(),
186 len_sizes: len_sizes.clone(),
187 };
188 LgBytesMetrics {
189 persist_azure: op("persist_azure"),
190 persist_arrow: op("persist_arrow"),
191 }
192 }
193}
194
195impl LgBytesOpMetrics {
196 pub fn new_region<T: Copy>(&self, capacity: usize) -> MetricsRegion<T> {
198 let start = Instant::now();
199
200 let capacity = std::cmp::max(capacity, 1 << lgalloc::VALID_SIZE_CLASS.start);
202 let region = match Region::new_mmap(capacity) {
203 Ok(region) => region,
204 Err(err) => {
205 if let AllocError::Disabled = err {
206 self.mmap_disabled_count.inc()
207 } else {
208 debug!("failed to mmap allocate: {}", err);
209 self.mmap_error_count.inc();
210 }
211 Region::new_heap(capacity)
212 }
213 };
214 let region = self.metrics_region(region);
215 self.alloc_seconds.inc_by(start.elapsed().as_secs_f64());
216
217 region
218 }
219
220 pub fn try_mmap_bytes(&self, buf: Bytes) -> Bytes {
223 self.try_mmap_region(buf.as_ref())
224 .map(Bytes::from)
225 .unwrap_or(buf)
226 }
227
228 pub fn try_mmap_region<T: Copy>(
230 &self,
231 buf: impl AsRef<[T]>,
232 ) -> Result<MetricsRegion<T>, AllocError> {
233 let start = Instant::now();
234 let buf = buf.as_ref();
235 let capacity = std::cmp::max(buf.len(), 1 << lgalloc::VALID_SIZE_CLASS.start);
237 let buf = match Region::new_mmap(capacity) {
238 Ok(mut region) => {
239 region.extend_from_slice(buf);
240 Ok(region)
241 }
242 Err(err) => {
243 match &err {
244 AllocError::Disabled => self.mmap_disabled_count.inc(),
245 err => {
246 debug!("failed to mmap allocate: {}", err);
247 self.mmap_error_count.inc();
248 }
249 };
250 Err(err)
251 }
252 }?;
253 let region = self.metrics_region(buf);
254 self.alloc_seconds.inc_by(start.elapsed().as_secs_f64());
255 Ok(region)
256 }
257
258 pub fn heap_region<T: Copy>(&self, buf: Vec<T>) -> MetricsRegion<T> {
262 self.metrics_region(Region::Heap(buf))
264 }
265
266 fn metrics_region<T: Copy>(&self, buf: Region<T>) -> MetricsRegion<T> {
267 let metrics = match buf {
268 Region::MMap(_) => &self.mmap,
269 Region::Heap(_) => &self.heap,
270 };
271 let region = MetricsRegion {
272 buf,
273 free_count: metrics.free_count.clone(),
274 free_capacity_bytes: metrics.free_capacity_bytes.clone(),
275 };
276 metrics.alloc_count.inc();
277 metrics
278 .alloc_capacity_bytes
279 .inc_by(u64::cast_from(region.capacity_bytes()));
280 let len_bytes = region.buf.len() * std::mem::size_of::<T>();
281 self.len_sizes.observe(f64::cast_lossy(len_bytes));
282 region
283 }
284}