Skip to main content

mz_ore/
lgbytes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [bytes] crate but backed by [lgalloc].
17
18use std::fmt::Debug;
19use std::time::Instant;
20
21use bytes::Bytes;
22use lgalloc::AllocError;
23use prometheus::{Counter, CounterVec, Histogram, IntCounter, IntCounterVec};
24use tracing::debug;
25
26use crate::cast::{CastFrom, CastLossy};
27use crate::metric;
28use crate::metrics::MetricsRegistry;
29use crate::region::Region;
30
31impl From<MetricsRegion<u8>> for Bytes {
32    fn from(bytes: MetricsRegion<u8>) -> Bytes {
33        // This will handle the drop correctly when the refcount goes to 0...
34        // see the rustdoc on this method for more details.
35        Bytes::from_owner(bytes)
36    }
37}
38
39/// A [Region] wrapper that increments metrics when it is dropped.
40///
41/// The `T: Copy` bound ensures that the `Region` doesn't leak resources when
42/// dropped.
43pub struct MetricsRegion<T: Copy> {
44    buf: Region<T>,
45    free_count: IntCounter,
46    free_capacity_bytes: IntCounter,
47}
48
49impl<T: Copy + Debug> Debug for MetricsRegion<T> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        std::fmt::Debug::fmt(self.buf.as_vec(), f)
52    }
53}
54
55impl<T: Copy> MetricsRegion<T> {
56    fn capacity_bytes(&self) -> usize {
57        self.buf.capacity() * std::mem::size_of::<T>()
58    }
59
60    /// Copy all of the elements from `slice` into the [`Region`].
61    ///
62    /// # Panics
63    ///
64    /// * If the [`Region`] does not have enough capacity.
65    pub fn extend_from_slice(&mut self, slice: &[T]) {
66        self.buf.extend_from_slice(slice);
67    }
68}
69
70impl<T: Copy + PartialEq> PartialEq for MetricsRegion<T> {
71    fn eq(&self, other: &Self) -> bool {
72        self.buf.as_vec() == other.buf.as_vec()
73    }
74}
75
76impl<T: Copy + Eq> Eq for MetricsRegion<T> {}
77
78impl<T: Copy> Drop for MetricsRegion<T> {
79    fn drop(&mut self) {
80        self.free_count.inc();
81        self.free_capacity_bytes
82            .inc_by(u64::cast_from(self.capacity_bytes()));
83    }
84}
85
86impl<T: Copy> AsRef<[T]> for MetricsRegion<T> {
87    fn as_ref(&self) -> &[T] {
88        &self.buf[..]
89    }
90}
91
92/// Metrics for lgalloc'd bytes..
93#[derive(Debug, Clone)]
94pub struct LgBytesMetrics {
95    /// Metrics for the "persist_azure" usage of lgalloc bytes.
96    pub persist_azure: LgBytesOpMetrics,
97    /// Metrics for the "persist_arrow" usage of lgalloc bytes.
98    pub persist_arrow: LgBytesOpMetrics,
99}
100
101/// Metrics for an individual usage of lgalloc bytes.
102#[derive(Clone)]
103pub struct LgBytesOpMetrics {
104    heap: LgBytesRegionMetrics,
105    mmap: LgBytesRegionMetrics,
106    alloc_seconds: Counter,
107    mmap_disabled_count: IntCounter,
108    mmap_error_count: IntCounter,
109    // NB: Unlike the _bytes per-Region metrics, which are capacity, this is
110    // intentionally the requested len.
111    len_sizes: Histogram,
112}
113
114impl std::fmt::Debug for LgBytesOpMetrics {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("LgBytesOperationMetrics")
117            .finish_non_exhaustive()
118    }
119}
120
121#[derive(Clone)]
122struct LgBytesRegionMetrics {
123    alloc_count: IntCounter,
124    alloc_capacity_bytes: IntCounter,
125    free_count: IntCounter,
126    free_capacity_bytes: IntCounter,
127}
128
129impl LgBytesMetrics {
130    /// Returns a new [LgBytesMetrics] connected to the given metrics registry.
131    pub fn new(registry: &MetricsRegistry) -> Self {
132        let alloc_count: IntCounterVec = registry.register(metric!(
133            name: "mz_lgbytes_alloc_count",
134            help: "count of LgBytes allocations",
135            var_labels: ["op", "region"],
136        ));
137        let alloc_capacity_bytes: IntCounterVec = registry.register(metric!(
138            name: "mz_lgbytes_alloc_capacity_bytes",
139            help: "total capacity bytes of LgBytes allocations",
140            var_labels: ["op", "region"],
141        ));
142        let free_count: IntCounterVec = registry.register(metric!(
143            name: "mz_lgbytes_free_count",
144            help: "count of LgBytes frees",
145            var_labels: ["op", "region"],
146        ));
147        let free_capacity_bytes: IntCounterVec = registry.register(metric!(
148            name: "mz_lgbytes_free_capacity_bytes",
149            help: "total capacity bytes of LgBytes frees",
150            var_labels: ["op", "region"],
151        ));
152        let alloc_seconds: CounterVec = registry.register(metric!(
153            name: "mz_lgbytes_alloc_seconds",
154            help: "seconds spent getting LgBytes allocations and copying in data",
155            var_labels: ["op"],
156        ));
157        let mmap_disabled_count: IntCounter = registry.register(metric!(
158            name: "mz_bytes_mmap_disabled_count",
159            help: "count alloc attempts with lgalloc disabled",
160        ));
161        let mmap_error_count: IntCounter = registry.register(metric!(
162            name: "mz_bytes_mmap_error_count",
163            help: "count of errors when attempting file-based mapped alloc",
164        ));
165        let len_sizes: Histogram = registry.register(metric!(
166            name: "mz_bytes_alloc_len_sizes",
167            help: "histogram of LgBytes alloc len sizes",
168            buckets: crate::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
169        ));
170        let op = |name: &str| LgBytesOpMetrics {
171            heap: LgBytesRegionMetrics {
172                alloc_count: alloc_count.with_label_values(&[name, "heap"]),
173                alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "heap"]),
174                free_count: free_count.with_label_values(&[name, "heap"]),
175                free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "heap"]),
176            },
177            mmap: LgBytesRegionMetrics {
178                alloc_count: alloc_count.with_label_values(&[name, "mmap"]),
179                alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "mmap"]),
180                free_count: free_count.with_label_values(&[name, "mmap"]),
181                free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "mmap"]),
182            },
183            alloc_seconds: alloc_seconds.with_label_values(&[name]),
184            mmap_disabled_count: mmap_disabled_count.clone(),
185            mmap_error_count: mmap_error_count.clone(),
186            len_sizes: len_sizes.clone(),
187        };
188        LgBytesMetrics {
189            persist_azure: op("persist_azure"),
190            persist_arrow: op("persist_arrow"),
191        }
192    }
193}
194
195impl LgBytesOpMetrics {
196    /// Returns a new empty [`MetricsRegion`] to hold at least `T` elements.
197    pub fn new_region<T: Copy>(&self, capacity: usize) -> MetricsRegion<T> {
198        let start = Instant::now();
199
200        // Round the capacity up to the minimum lgalloc mmap size.
201        let capacity = std::cmp::max(capacity, 1 << lgalloc::VALID_SIZE_CLASS.start);
202        let region = match Region::new_mmap(capacity) {
203            Ok(region) => region,
204            Err(err) => {
205                if let AllocError::Disabled = err {
206                    self.mmap_disabled_count.inc()
207                } else {
208                    debug!("failed to mmap allocate: {}", err);
209                    self.mmap_error_count.inc();
210                }
211                Region::new_heap(capacity)
212            }
213        };
214        let region = self.metrics_region(region);
215        self.alloc_seconds.inc_by(start.elapsed().as_secs_f64());
216
217        region
218    }
219
220    /// Attempts to copy the given bytes into an lgalloc-managed file-based mapped
221    /// region. If that fails, we return the original bytes.
222    pub fn try_mmap_bytes(&self, buf: Bytes) -> Bytes {
223        self.try_mmap_region(buf.as_ref())
224            .map(Bytes::from)
225            .unwrap_or(buf)
226    }
227
228    /// Attempts to copy the given buf into an lgalloc managed file-based mapped region.
229    pub fn try_mmap_region<T: Copy>(
230        &self,
231        buf: impl AsRef<[T]>,
232    ) -> Result<MetricsRegion<T>, AllocError> {
233        let start = Instant::now();
234        let buf = buf.as_ref();
235        // Round the capacity up to the minimum lgalloc mmap size.
236        let capacity = std::cmp::max(buf.len(), 1 << lgalloc::VALID_SIZE_CLASS.start);
237        let buf = match Region::new_mmap(capacity) {
238            Ok(mut region) => {
239                region.extend_from_slice(buf);
240                Ok(region)
241            }
242            Err(err) => {
243                match &err {
244                    AllocError::Disabled => self.mmap_disabled_count.inc(),
245                    err => {
246                        debug!("failed to mmap allocate: {}", err);
247                        self.mmap_error_count.inc();
248                    }
249                };
250                Err(err)
251            }
252        }?;
253        let region = self.metrics_region(buf);
254        self.alloc_seconds.inc_by(start.elapsed().as_secs_f64());
255        Ok(region)
256    }
257
258    /// Wraps the already owned buf into a [Region::Heap] with metrics.
259    ///
260    /// Besides metrics, this is essentially a no-op.
261    pub fn heap_region<T: Copy>(&self, buf: Vec<T>) -> MetricsRegion<T> {
262        // Intentionally don't bother incrementing alloc_seconds here.
263        self.metrics_region(Region::Heap(buf))
264    }
265
266    fn metrics_region<T: Copy>(&self, buf: Region<T>) -> MetricsRegion<T> {
267        let metrics = match buf {
268            Region::MMap(_) => &self.mmap,
269            Region::Heap(_) => &self.heap,
270        };
271        let region = MetricsRegion {
272            buf,
273            free_count: metrics.free_count.clone(),
274            free_capacity_bytes: metrics.free_capacity_bytes.clone(),
275        };
276        metrics.alloc_count.inc();
277        metrics
278            .alloc_capacity_bytes
279            .inc_by(u64::cast_from(region.capacity_bytes()));
280        let len_bytes = region.buf.len() * std::mem::size_of::<T>();
281        self.len_sizes.observe(f64::cast_lossy(len_bytes));
282        region
283    }
284}