mz_ore/
lgbytes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [bytes] crate but backed by [lgalloc].
17
18use std::fmt::Debug;
19use std::time::Instant;
20
21use bytes::Bytes;
22use lgalloc::AllocError;
23use prometheus::{Counter, CounterVec, Histogram, IntCounter, IntCounterVec};
24use tracing::debug;
25
26use crate::cast::{CastFrom, CastLossy};
27use crate::metric;
28use crate::metrics::MetricsRegistry;
29use crate::region::Region;
30
31impl From<MetricsRegion<u8>> for Bytes {
32    fn from(bytes: MetricsRegion<u8>) -> Bytes {
33        // This will handle the drop correctly when the refcount goes to 0...
34        // see the rustdoc on this method for more details.
35        Bytes::from_owner(bytes)
36    }
37}
38
39/// A [Region] wrapper that increments metrics when it is dropped.
40///
41/// The `T: Copy` bound ensures that the `Region` doesn't leak resources when
42/// dropped.
43pub struct MetricsRegion<T: Copy> {
44    buf: Region<T>,
45    free_count: IntCounter,
46    free_capacity_bytes: IntCounter,
47}
48
49impl<T: Copy + Debug> Debug for MetricsRegion<T> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        std::fmt::Debug::fmt(self.buf.as_vec(), f)
52    }
53}
54
55impl<T: Copy> MetricsRegion<T> {
56    fn capacity_bytes(&self) -> usize {
57        self.buf.capacity() * std::mem::size_of::<T>()
58    }
59
60    /// Copy all of the elements from `slice` into the [`Region`].
61    ///
62    /// # Panics
63    ///
64    /// * If the [`Region`] does not have enough capacity.
65    pub fn extend_from_slice(&mut self, slice: &[T]) {
66        self.buf.extend_from_slice(slice);
67    }
68}
69
70impl<T: Copy + PartialEq> PartialEq for MetricsRegion<T> {
71    fn eq(&self, other: &Self) -> bool {
72        self.buf.as_vec() == other.buf.as_vec()
73    }
74}
75
76impl<T: Copy + Eq> Eq for MetricsRegion<T> {}
77
78impl<T: Copy> Drop for MetricsRegion<T> {
79    fn drop(&mut self) {
80        self.free_count.inc();
81        self.free_capacity_bytes
82            .inc_by(u64::cast_from(self.capacity_bytes()));
83    }
84}
85
86impl<T: Copy> AsRef<[T]> for MetricsRegion<T> {
87    fn as_ref(&self) -> &[T] {
88        &self.buf[..]
89    }
90}
91
92/// Metrics for lgalloc'd bytes..
93#[derive(Debug, Clone)]
94pub struct LgBytesMetrics {
95    /// Metrics for the "persist_s3" usage of lgalloc bytes.
96    pub persist_s3: LgBytesOpMetrics,
97    /// Metrics for the "persist_azure" usage of lgalloc bytes.
98    pub persist_azure: LgBytesOpMetrics,
99    /// Metrics for the "persist_arrow" usage of lgalloc bytes.
100    pub persist_arrow: LgBytesOpMetrics,
101}
102
103/// Metrics for an individual usage of lgalloc bytes.
104#[derive(Clone)]
105pub struct LgBytesOpMetrics {
106    heap: LgBytesRegionMetrics,
107    mmap: LgBytesRegionMetrics,
108    alloc_seconds: Counter,
109    mmap_disabled_count: IntCounter,
110    mmap_error_count: IntCounter,
111    // NB: Unlike the _bytes per-Region metrics, which are capacity, this is
112    // intentionally the requested len.
113    len_sizes: Histogram,
114}
115
116impl std::fmt::Debug for LgBytesOpMetrics {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        f.debug_struct("LgBytesOperationMetrics")
119            .finish_non_exhaustive()
120    }
121}
122
123#[derive(Clone)]
124struct LgBytesRegionMetrics {
125    alloc_count: IntCounter,
126    alloc_capacity_bytes: IntCounter,
127    free_count: IntCounter,
128    free_capacity_bytes: IntCounter,
129}
130
131impl LgBytesMetrics {
132    /// Returns a new [LgBytesMetrics] connected to the given metrics registry.
133    pub fn new(registry: &MetricsRegistry) -> Self {
134        let alloc_count: IntCounterVec = registry.register(metric!(
135            name: "mz_lgbytes_alloc_count",
136            help: "count of LgBytes allocations",
137            var_labels: ["op", "region"],
138        ));
139        let alloc_capacity_bytes: IntCounterVec = registry.register(metric!(
140            name: "mz_lgbytes_alloc_capacity_bytes",
141            help: "total capacity bytes of LgBytes allocations",
142            var_labels: ["op", "region"],
143        ));
144        let free_count: IntCounterVec = registry.register(metric!(
145            name: "mz_lgbytes_free_count",
146            help: "count of LgBytes frees",
147            var_labels: ["op", "region"],
148        ));
149        let free_capacity_bytes: IntCounterVec = registry.register(metric!(
150            name: "mz_lgbytes_free_capacity_bytes",
151            help: "total capacity bytes of LgBytes frees",
152            var_labels: ["op", "region"],
153        ));
154        let alloc_seconds: CounterVec = registry.register(metric!(
155            name: "mz_lgbytes_alloc_seconds",
156            help: "seconds spent getting LgBytes allocations and copying in data",
157            var_labels: ["op"],
158        ));
159        let mmap_disabled_count: IntCounter = registry.register(metric!(
160            name: "mz_bytes_mmap_disabled_count",
161            help: "count alloc attempts with lgalloc disabled",
162        ));
163        let mmap_error_count: IntCounter = registry.register(metric!(
164            name: "mz_bytes_mmap_error_count",
165            help: "count of errors when attempting file-based mapped alloc",
166        ));
167        let len_sizes: Histogram = registry.register(metric!(
168            name: "mz_bytes_alloc_len_sizes",
169            help: "histogram of LgBytes alloc len sizes",
170            buckets: crate::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
171        ));
172        let op = |name: &str| LgBytesOpMetrics {
173            heap: LgBytesRegionMetrics {
174                alloc_count: alloc_count.with_label_values(&[name, "heap"]),
175                alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "heap"]),
176                free_count: free_count.with_label_values(&[name, "heap"]),
177                free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "heap"]),
178            },
179            mmap: LgBytesRegionMetrics {
180                alloc_count: alloc_count.with_label_values(&[name, "mmap"]),
181                alloc_capacity_bytes: alloc_capacity_bytes.with_label_values(&[name, "mmap"]),
182                free_count: free_count.with_label_values(&[name, "mmap"]),
183                free_capacity_bytes: free_capacity_bytes.with_label_values(&[name, "mmap"]),
184            },
185            alloc_seconds: alloc_seconds.with_label_values(&[name]),
186            mmap_disabled_count: mmap_disabled_count.clone(),
187            mmap_error_count: mmap_error_count.clone(),
188            len_sizes: len_sizes.clone(),
189        };
190        LgBytesMetrics {
191            persist_s3: op("persist_s3"),
192            persist_azure: op("persist_azure"),
193            persist_arrow: op("persist_arrow"),
194        }
195    }
196}
197
198impl LgBytesOpMetrics {
199    /// Returns a new empty [`MetricsRegion`] to hold at least `T` elements.
200    pub fn new_region<T: Copy>(&self, capacity: usize) -> MetricsRegion<T> {
201        let start = Instant::now();
202
203        // Round the capacity up to the minimum lgalloc mmap size.
204        let capacity = std::cmp::max(capacity, 1 << lgalloc::VALID_SIZE_CLASS.start);
205        let region = match Region::new_mmap(capacity) {
206            Ok(region) => region,
207            Err(err) => {
208                if let AllocError::Disabled = err {
209                    self.mmap_disabled_count.inc()
210                } else {
211                    debug!("failed to mmap allocate: {}", err);
212                    self.mmap_error_count.inc();
213                }
214                Region::new_heap(capacity)
215            }
216        };
217        let region = self.metrics_region(region);
218        self.alloc_seconds.inc_by(start.elapsed().as_secs_f64());
219
220        region
221    }
222
223    /// Attempts to copy the given bytes into an lgalloc-managed file-based mapped
224    /// region. If that fails, we return the original bytes.
225    pub fn try_mmap_bytes(&self, buf: Bytes) -> Bytes {
226        self.try_mmap_region(buf.as_ref())
227            .map(Bytes::from)
228            .unwrap_or(buf)
229    }
230
231    /// Attempts to copy the given buf into an lgalloc managed file-based mapped region.
232    pub fn try_mmap_region<T: Copy>(
233        &self,
234        buf: impl AsRef<[T]>,
235    ) -> Result<MetricsRegion<T>, AllocError> {
236        let start = Instant::now();
237        let buf = buf.as_ref();
238        // Round the capacity up to the minimum lgalloc mmap size.
239        let capacity = std::cmp::max(buf.len(), 1 << lgalloc::VALID_SIZE_CLASS.start);
240        let buf = match Region::new_mmap(capacity) {
241            Ok(mut region) => {
242                region.extend_from_slice(buf);
243                Ok(region)
244            }
245            Err(err) => {
246                match &err {
247                    AllocError::Disabled => self.mmap_disabled_count.inc(),
248                    err => {
249                        debug!("failed to mmap allocate: {}", err);
250                        self.mmap_error_count.inc();
251                    }
252                };
253                Err(err)
254            }
255        }?;
256        let region = self.metrics_region(buf);
257        self.alloc_seconds.inc_by(start.elapsed().as_secs_f64());
258        Ok(region)
259    }
260
261    /// Wraps the already owned buf into a [Region::Heap] with metrics.
262    ///
263    /// Besides metrics, this is essentially a no-op.
264    pub fn heap_region<T: Copy>(&self, buf: Vec<T>) -> MetricsRegion<T> {
265        // Intentionally don't bother incrementing alloc_seconds here.
266        self.metrics_region(Region::Heap(buf))
267    }
268
269    fn metrics_region<T: Copy>(&self, buf: Region<T>) -> MetricsRegion<T> {
270        let metrics = match buf {
271            Region::MMap(_) => &self.mmap,
272            Region::Heap(_) => &self.heap,
273        };
274        let region = MetricsRegion {
275            buf,
276            free_count: metrics.free_count.clone(),
277            free_capacity_bytes: metrics.free_capacity_bytes.clone(),
278        };
279        metrics.alloc_count.inc();
280        metrics
281            .alloc_capacity_bytes
282            .inc_by(u64::cast_from(region.capacity_bytes()));
283        let len_bytes = region.buf.len() * std::mem::size_of::<T>();
284        self.len_sizes.observe(f64::cast_lossy(len_bytes));
285        region
286    }
287}