lgalloc/
lib.rs

1//! A size-classed file-backed large object allocator.
2//!
3//! This library contains types to allocate memory outside the heap,
4//! supporting power-of-two object sizes. Each size class has its own
5//! memory pool.
6//!
7//! # Safety
8//!
9//! This library is very unsafe on account of `unsafe` and interacting directly
10//! with libc, including Linux extension.
11//!
12//! The library relies on memory-mapped files. Users of this file must not fork the process
13//! because otherwise two processes would share the same mappings, causing undefined behavior
14//! because the mutable pointers would not be unique anymore. Unfortunately, there is no way
15//! to tell the memory subsystem that the shared mappings must not be inherited.
16//!
17//! Clients must not lock pages (`mlock`), or need to unlock the pages before returning them
18//! to lgalloc.
19
20#![deny(missing_docs)]
21
22use std::cell::RefCell;
23use std::collections::HashMap;
24use std::fs::File;
25use std::marker::PhantomData;
26use std::mem::{take, ManuallyDrop, MaybeUninit};
27use std::ops::{Deref, Range};
28use std::os::fd::{AsFd, AsRawFd};
29use std::path::PathBuf;
30use std::ptr::NonNull;
31use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
32use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
33use std::sync::{Arc, Mutex, OnceLock, RwLock};
34use std::thread::{JoinHandle, ThreadId};
35use std::time::{Duration, Instant};
36
37use crossbeam_deque::{Injector, Steal, Stealer, Worker};
38use memmap2::MmapMut;
39use numa_maps::NumaMap;
40use thiserror::Error;
41
42mod readme {
43    #![doc = include_str!("../README.md")]
44}
45
46/// Handle to describe allocations.
47///
48/// Handles represent a leased allocation, which must be explicitly freed. Otherwise, the caller will permanently leak
49/// the associated memory.
50pub struct Handle {
51    /// The actual pointer.
52    ptr: NonNull<u8>,
53    /// Length of the allocation.
54    len: usize,
55}
56
57unsafe impl Send for Handle {}
58unsafe impl Sync for Handle {}
59
60#[allow(clippy::len_without_is_empty)]
61impl Handle {
62    /// Construct a new handle from a region of memory
63    fn new(ptr: NonNull<u8>, len: usize) -> Self {
64        Self { ptr, len }
65    }
66
67    /// Construct a dangling handle, which is only suitable for zero-sized types.
68    fn dangling() -> Self {
69        Self {
70            ptr: NonNull::dangling(),
71            len: 0,
72        }
73    }
74
75    fn is_dangling(&self) -> bool {
76        self.ptr == NonNull::dangling()
77    }
78
79    /// Length of the memory area in bytes.
80    fn len(&self) -> usize {
81        self.len
82    }
83
84    /// Pointer to memory.
85    fn as_non_null(&self) -> NonNull<u8> {
86        self.ptr
87    }
88
89    /// Indicate that the memory is not in use and that the OS can recycle it.
90    fn clear(&mut self) -> std::io::Result<()> {
91        // SAFETY: `MADV_DONTNEED_STRATEGY` guaranteed to be a valid argument.
92        unsafe { self.madvise(MADV_DONTNEED_STRATEGY) }
93    }
94
95    /// Indicate that the memory is not in use and that the OS can recycle it.
96    fn fast_clear(&mut self) -> std::io::Result<()> {
97        // SAFETY: `libc::MADV_DONTNEED` documented to be a valid argument.
98        unsafe { self.madvise(libc::MADV_DONTNEED) }
99    }
100
101    /// Call `madvise` on the memory region. Unsafe because `advice` is passed verbatim.
102    unsafe fn madvise(&self, advice: libc::c_int) -> std::io::Result<()> {
103        // SAFETY: Calling into `madvise`:
104        // * The ptr is page-aligned by construction.
105        // * The ptr + length is page-aligned by construction (not required but surprising otherwise)
106        // * Mapped shared and writable (for MADV_REMOVE),
107        // * Pages not locked.
108        // * The caller is responsible for passing a valid `advice` parameter.
109        let ptr = self.as_non_null().as_ptr().cast();
110        let ret = unsafe { libc::madvise(ptr, self.len, advice) };
111        if ret != 0 {
112            let err = std::io::Error::last_os_error();
113            return Err(err);
114        }
115        Ok(())
116    }
117}
118
119/// Initial file size
120const INITIAL_SIZE: usize = 32 << 20;
121
122/// Range of valid size classes.
123pub const VALID_SIZE_CLASS: Range<usize> = 10..37;
124
125/// Strategy to indicate that the OS can reclaim pages
126// TODO: On Linux, we want to use MADV_REMOVE, but that's only supported
127// on file systems that supports FALLOC_FL_PUNCH_HOLE. We should check
128// the return value and retry EOPNOTSUPP with MADV_DONTNEED.
129#[cfg(target_os = "linux")]
130const MADV_DONTNEED_STRATEGY: libc::c_int = libc::MADV_REMOVE;
131
132#[cfg(not(target_os = "linux"))]
133const MADV_DONTNEED_STRATEGY: libc::c_int = libc::MADV_DONTNEED;
134
135type PhantomUnsyncUnsend<T> = PhantomData<*mut T>;
136
137/// Allocation errors
138#[derive(Error, Debug)]
139pub enum AllocError {
140    /// IO error, unrecoverable
141    #[error("I/O error")]
142    Io(#[from] std::io::Error),
143    /// Out of memory, meaning that the pool is exhausted.
144    #[error("Out of memory")]
145    OutOfMemory,
146    /// Size class too large or small
147    #[error("Invalid size class")]
148    InvalidSizeClass(usize),
149    /// Allocator disabled
150    #[error("Disabled by configuration")]
151    Disabled,
152    /// Failed to allocate memory that suits alignment properties.
153    #[error("Memory unsuitable for requested alignment")]
154    UnalignedMemory,
155}
156
157impl AllocError {
158    /// Check if this error is [`AllocError::Disabled`].
159    #[must_use]
160    pub fn is_disabled(&self) -> bool {
161        matches!(self, AllocError::Disabled)
162    }
163}
164
165/// Abstraction over size classes.
166#[derive(Clone, Copy)]
167struct SizeClass(usize);
168
169impl SizeClass {
170    const fn new_unchecked(value: usize) -> Self {
171        Self(value)
172    }
173
174    const fn index(self) -> usize {
175        self.0 - VALID_SIZE_CLASS.start
176    }
177
178    /// The size in bytes of this size class.
179    const fn byte_size(self) -> usize {
180        1 << self.0
181    }
182
183    const fn from_index(index: usize) -> Self {
184        Self(index + VALID_SIZE_CLASS.start)
185    }
186
187    /// Obtain a size class from a size in bytes.
188    fn from_byte_size(byte_size: usize) -> Result<Self, AllocError> {
189        let class = byte_size.next_power_of_two().trailing_zeros() as usize;
190        class.try_into()
191    }
192
193    const fn from_byte_size_unchecked(byte_size: usize) -> Self {
194        Self::new_unchecked(byte_size.next_power_of_two().trailing_zeros() as usize)
195    }
196}
197
198impl TryFrom<usize> for SizeClass {
199    type Error = AllocError;
200
201    fn try_from(value: usize) -> Result<Self, Self::Error> {
202        if VALID_SIZE_CLASS.contains(&value) {
203            Ok(SizeClass(value))
204        } else {
205            Err(AllocError::InvalidSizeClass(value))
206        }
207    }
208}
209
210#[derive(Default, Debug)]
211struct AllocStats {
212    allocations: AtomicU64,
213    slow_path: AtomicU64,
214    refill: AtomicU64,
215    deallocations: AtomicU64,
216    clear_eager: AtomicU64,
217    clear_slow: AtomicU64,
218}
219
220/// Handle to the shared global state.
221static INJECTOR: OnceLock<GlobalStealer> = OnceLock::new();
222
223/// Enabled switch to turn on or off lgalloc. Off by default.
224static LGALLOC_ENABLED: AtomicBool = AtomicBool::new(false);
225
226/// Enable eager returning of memory. Off by default.
227static LGALLOC_EAGER_RETURN: AtomicBool = AtomicBool::new(false);
228
229/// Dampener in the file growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
230///
231/// Setting this to 0 results in creating files with doubling capacity.
232/// Larger numbers result in more conservative approaches that create more files.
233static LGALLOC_FILE_GROWTH_DAMPENER: AtomicUsize = AtomicUsize::new(0);
234
235/// The size of allocations to retain locally, per thread and size class.
236static LOCAL_BUFFER_BYTES: AtomicUsize = AtomicUsize::new(32 << 20);
237
238/// Type maintaining the global state for each size class.
239struct GlobalStealer {
240    /// State for each size class. An entry at position `x` handle size class `x`, which is areas
241    /// of size `1<<x`.
242    size_classes: Vec<SizeClassState>,
243    /// Path to store files
244    path: RwLock<Option<PathBuf>>,
245    /// Shared token to access background thread.
246    background_sender: Mutex<Option<(JoinHandle<()>, Sender<BackgroundWorkerConfig>)>>,
247}
248
249/// Per-size-class state
250#[derive(Default)]
251struct SizeClassState {
252    /// Handle to memory-mapped regions.
253    ///
254    /// We must never dereference the memory-mapped regions stored here.
255    areas: RwLock<Vec<ManuallyDrop<(File, MmapMut)>>>,
256    /// Injector to distribute memory globally.
257    injector: Injector<Handle>,
258    /// Injector to distribute memory globally, freed memory.
259    clean_injector: Injector<Handle>,
260    /// Slow-path lock to refill pool.
261    lock: Mutex<()>,
262    /// Thread stealers to allow all participating threads to steal memory.
263    stealers: RwLock<HashMap<ThreadId, PerThreadState<Handle>>>,
264    /// Summed stats for terminated threads.
265    alloc_stats: AllocStats,
266    /// Total virtual size of all mappings in this size class in bytes.
267    total_bytes: AtomicUsize,
268    /// Count of areas backing this size class.
269    area_count: AtomicUsize,
270}
271
272impl GlobalStealer {
273    /// Obtain the shared global state.
274    fn get_static() -> &'static Self {
275        INJECTOR.get_or_init(Self::new)
276    }
277
278    /// Obtain the per-size-class global state.
279    fn get_size_class(&self, size_class: SizeClass) -> &SizeClassState {
280        &self.size_classes[size_class.index()]
281    }
282
283    fn new() -> Self {
284        let mut size_classes = Vec::with_capacity(VALID_SIZE_CLASS.len());
285
286        for _ in VALID_SIZE_CLASS {
287            size_classes.push(SizeClassState::default());
288        }
289
290        Self {
291            size_classes,
292            path: RwLock::default(),
293            background_sender: Mutex::default(),
294        }
295    }
296}
297
298impl Drop for GlobalStealer {
299    fn drop(&mut self) {
300        take(&mut self.size_classes);
301    }
302}
303
304struct PerThreadState<T> {
305    stealer: Stealer<T>,
306    alloc_stats: Arc<AllocStats>,
307}
308
309/// Per-thread and state, sharded by size class.
310struct ThreadLocalStealer {
311    /// Per-size-class state
312    size_classes: Vec<LocalSizeClass>,
313    _phantom: PhantomUnsyncUnsend<Self>,
314}
315
316impl ThreadLocalStealer {
317    fn new() -> Self {
318        let thread_id = std::thread::current().id();
319        let size_classes = VALID_SIZE_CLASS
320            .map(|size_class| LocalSizeClass::new(SizeClass::new_unchecked(size_class), thread_id))
321            .collect();
322        Self {
323            size_classes,
324            _phantom: PhantomData,
325        }
326    }
327
328    /// Allocate a memory region from a specific size class.
329    ///
330    /// Returns [`AllocError::Disabled`] if lgalloc is not enabled. Returns other error types
331    /// if out of memory, or an internal operation fails.
332    fn allocate(&self, size_class: SizeClass) -> Result<Handle, AllocError> {
333        if !LGALLOC_ENABLED.load(Ordering::Relaxed) {
334            return Err(AllocError::Disabled);
335        }
336        self.size_classes[size_class.index()].get_with_refill()
337    }
338
339    /// Return memory to the allocator. Must have been obtained through [`allocate`].
340    fn deallocate(&self, mem: Handle) {
341        let size_class = SizeClass::from_byte_size_unchecked(mem.len());
342
343        self.size_classes[size_class.index()].push(mem);
344    }
345}
346
347thread_local! {
348    static WORKER: RefCell<ThreadLocalStealer> = RefCell::new(ThreadLocalStealer::new());
349}
350
351/// Per-thread, per-size-class state
352///
353/// # Safety
354///
355/// We store parts of areas in this struct. Leaking this struct leaks the areas, which is safe
356/// because we will never try to access or reclaim them.
357struct LocalSizeClass {
358    /// Local memory queue.
359    worker: Worker<Handle>,
360    /// Size class we're covering
361    size_class: SizeClass,
362    /// Handle to global size class state
363    size_class_state: &'static SizeClassState,
364    /// Owning thread's ID
365    thread_id: ThreadId,
366    /// Shared statistics maintained by this thread.
367    stats: Arc<AllocStats>,
368    /// Phantom data to prevent sending the type across thread boundaries.
369    _phantom: PhantomUnsyncUnsend<Self>,
370}
371
372impl LocalSizeClass {
373    /// Construct a new local size class state. Registers the worker with the global state.
374    fn new(size_class: SizeClass, thread_id: ThreadId) -> Self {
375        let worker = Worker::new_lifo();
376        let stealer = GlobalStealer::get_static();
377        let size_class_state = stealer.get_size_class(size_class);
378
379        let stats = Arc::new(AllocStats::default());
380
381        let mut lock = size_class_state.stealers.write().expect("lock poisoned");
382        lock.insert(
383            thread_id,
384            PerThreadState {
385                stealer: worker.stealer(),
386                alloc_stats: Arc::clone(&stats),
387            },
388        );
389
390        Self {
391            worker,
392            size_class,
393            size_class_state,
394            thread_id,
395            stats,
396            _phantom: PhantomData,
397        }
398    }
399
400    /// Get a memory area. Tries to get a region from the local cache, before obtaining data from
401    /// the global state. As a last option, obtains memory from other workers.
402    ///
403    /// Returns [`AllcError::OutOfMemory`] if all pools are empty.
404    #[inline]
405    fn get(&self) -> Result<Handle, AllocError> {
406        self.worker
407            .pop()
408            .or_else(|| {
409                std::iter::repeat_with(|| {
410                    // The loop tries to obtain memory in the following order:
411                    // 1. Memory from the global state,
412                    // 2. Memory from the global cleaned state,
413                    // 3. Memory from other threads.
414                    let limit = 1.max(
415                        LOCAL_BUFFER_BYTES.load(Ordering::Relaxed)
416                            / self.size_class.byte_size()
417                            / 2,
418                    );
419
420                    self.size_class_state
421                        .injector
422                        .steal_batch_with_limit_and_pop(&self.worker, limit)
423                        .or_else(|| {
424                            self.size_class_state
425                                .clean_injector
426                                .steal_batch_with_limit_and_pop(&self.worker, limit)
427                        })
428                        .or_else(|| {
429                            self.size_class_state
430                                .stealers
431                                .read()
432                                .expect("lock poisoned")
433                                .values()
434                                .map(|state| state.stealer.steal())
435                                .collect()
436                        })
437                })
438                .find(|s| !s.is_retry())
439                .and_then(Steal::success)
440            })
441            .ok_or(AllocError::OutOfMemory)
442    }
443
444    /// Like [`Self::get()`] but trying to refill the pool if it is empty.
445    fn get_with_refill(&self) -> Result<Handle, AllocError> {
446        self.stats.allocations.fetch_add(1, Ordering::Relaxed);
447        // Fast-path: Get non-blocking
448        match self.get() {
449            Err(AllocError::OutOfMemory) => {
450                self.stats.slow_path.fetch_add(1, Ordering::Relaxed);
451                // Get a slow-path lock
452                let _lock = self.size_class_state.lock.lock().expect("lock poisoned");
453                // Try again because another thread might have refilled already
454                if let Ok(mem) = self.get() {
455                    return Ok(mem);
456                }
457                self.try_refill_and_get()
458            }
459            r => r,
460        }
461    }
462
463    /// Recycle memory. Stores it locally or forwards it to the global state.
464    fn push(&self, mut mem: Handle) {
465        debug_assert_eq!(mem.len(), self.size_class.byte_size());
466        self.stats.deallocations.fetch_add(1, Ordering::Relaxed);
467        if self.worker.len()
468            >= LOCAL_BUFFER_BYTES.load(Ordering::Relaxed) / self.size_class.byte_size()
469        {
470            if LGALLOC_EAGER_RETURN.load(Ordering::Relaxed) {
471                self.stats.clear_eager.fetch_add(1, Ordering::Relaxed);
472                mem.fast_clear().expect("clearing successful");
473            }
474            self.size_class_state.injector.push(mem);
475        } else {
476            self.worker.push(mem);
477        }
478    }
479
480    /// Refill the memory pool, and get one area.
481    ///
482    /// Returns an error if the memory pool cannot be refilled.
483    fn try_refill_and_get(&self) -> Result<Handle, AllocError> {
484        self.stats.refill.fetch_add(1, Ordering::Relaxed);
485        let mut stash = self.size_class_state.areas.write().expect("lock poisoned");
486
487        let initial_capacity = std::cmp::max(1, INITIAL_SIZE / self.size_class.byte_size());
488
489        let last_capacity =
490            stash.iter().last().map_or(0, |mmap| mmap.1.len()) / self.size_class.byte_size();
491        let growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.load(Ordering::Relaxed);
492        // We would like to grow the file capacity by a factor of `1+1/(growth_dampener+1)`,
493        // but at least by `initial_capacity`.
494        let next_capacity = last_capacity
495            + std::cmp::max(
496                initial_capacity,
497                last_capacity / (growth_dampener.saturating_add(1)),
498            );
499
500        let next_byte_len = next_capacity * self.size_class.byte_size();
501        let (file, mut mmap) = Self::init_file(next_byte_len)?;
502
503        self.size_class_state
504            .total_bytes
505            .fetch_add(next_byte_len, Ordering::Relaxed);
506        self.size_class_state
507            .area_count
508            .fetch_add(1, Ordering::Relaxed);
509
510        // SAFETY: Memory region initialized, so pointers to it are valid.
511        let mut chunks = mmap
512            .as_mut()
513            .chunks_exact_mut(self.size_class.byte_size())
514            .map(|chunk| NonNull::new(chunk.as_mut_ptr()).expect("non-null"));
515
516        // Capture first region to return immediately.
517        let ptr = chunks.next().expect("At least once chunk allocated.");
518        let mem = Handle::new(ptr, self.size_class.byte_size());
519
520        // Stash remaining in the injector.
521        for ptr in chunks {
522            self.size_class_state
523                .clean_injector
524                .push(Handle::new(ptr, self.size_class.byte_size()));
525        }
526
527        stash.push(ManuallyDrop::new((file, mmap)));
528        Ok(mem)
529    }
530
531    /// Allocate and map a file of size `byte_len`. Returns an handle, or error if the allocation
532    /// fails.
533    fn init_file(byte_len: usize) -> Result<(File, MmapMut), AllocError> {
534        let file = {
535            let path = GlobalStealer::get_static()
536                .path
537                .read()
538                .expect("lock poisoned");
539            let Some(path) = &*path else {
540                return Err(AllocError::Io(std::io::Error::from(
541                    std::io::ErrorKind::NotFound,
542                )));
543            };
544            tempfile::tempfile_in(path)?
545        };
546        let fd = file.as_fd().as_raw_fd();
547        let length = libc::off_t::try_from(byte_len).expect("Must fit");
548        // SAFETY: Calling ftruncate on the file, which we just created.
549        let ret = unsafe { libc::ftruncate(fd, length) };
550        if ret != 0 {
551            // file goes out of scope here, so no need for further cleanup.
552            return Err(std::io::Error::last_os_error().into());
553        }
554        // SAFETY: We only map `file` once, and never share it with other processes.
555        let mmap = unsafe { memmap2::MmapOptions::new().map_mut(&file)? };
556        assert_eq!(mmap.len(), byte_len);
557        Ok((file, mmap))
558    }
559}
560
561impl Drop for LocalSizeClass {
562    fn drop(&mut self) {
563        // Remove state associated with thread
564        if let Ok(mut lock) = self.size_class_state.stealers.write() {
565            lock.remove(&self.thread_id);
566        }
567
568        // Send memory back to global state
569        while let Some(mem) = self.worker.pop() {
570            self.size_class_state.injector.push(mem);
571        }
572
573        let ordering = Ordering::Relaxed;
574
575        // Update global metrics by moving all worker-local metrics to global state.
576        self.size_class_state
577            .alloc_stats
578            .allocations
579            .fetch_add(self.stats.allocations.load(ordering), ordering);
580        let global_stats = &self.size_class_state.alloc_stats;
581        global_stats
582            .refill
583            .fetch_add(self.stats.refill.load(ordering), ordering);
584        global_stats
585            .slow_path
586            .fetch_add(self.stats.slow_path.load(ordering), ordering);
587        global_stats
588            .deallocations
589            .fetch_add(self.stats.deallocations.load(ordering), ordering);
590        global_stats
591            .clear_slow
592            .fetch_add(self.stats.clear_slow.load(ordering), ordering);
593        global_stats
594            .clear_eager
595            .fetch_add(self.stats.clear_eager.load(ordering), ordering);
596    }
597}
598
599/// Access the per-thread context.
600fn thread_context<R, F: FnOnce(&ThreadLocalStealer) -> R>(f: F) -> R {
601    WORKER.with(|cell| f(&cell.borrow()))
602}
603
604/// Allocate a memory area suitable to hold `capacity` consecutive elements of `T`.
605///
606/// Returns a pointer, a capacity in `T`, and a handle if successful, and an error
607/// otherwise. The capacity can be larger than requested.
608///
609/// The memory must be freed using [`deallocate`], otherwise the memory leaks. The memory can be freed on a different thread.
610///
611/// # Errors
612///
613/// Allocate errors if the capacity cannot be supported by one of the size classes,
614/// the alignment requirements of `T` cannot be fulfilled, if no more memory can be
615/// obtained from the system, or if any syscall fails.
616///
617/// The function also returns an error if lgalloc is disabled.
618///
619/// In the case of an error, no memory is allocated, and we maintain the internal
620/// invariants of the allocator.
621///
622/// # Panics
623///
624/// The function can panic on internal errors, specifically when an allocation returned
625/// an unexpected size. In this case, we do not maintain the allocator invariants
626/// and the caller should abort the process.
627///
628/// Panics if the thread local variable has been dropped, see [`std::thread::LocalKey`]
629/// for details.
630pub fn allocate<T>(capacity: usize) -> Result<(NonNull<T>, usize, Handle), AllocError> {
631    if std::mem::size_of::<T>() == 0 {
632        return Ok((NonNull::dangling(), usize::MAX, Handle::dangling()));
633    } else if capacity == 0 {
634        return Ok((NonNull::dangling(), 0, Handle::dangling()));
635    }
636
637    // Round up to at least a page.
638    let byte_len = std::cmp::max(page_size::get(), std::mem::size_of::<T>() * capacity);
639    // With above rounding up to page sizes, we only allocate multiples of page size because
640    // we only support powers-of-two sized regions.
641    let size_class = SizeClass::from_byte_size(byte_len)?;
642
643    let handle = thread_context(|s| s.allocate(size_class))?;
644    debug_assert_eq!(handle.len(), size_class.byte_size());
645    let ptr: NonNull<T> = handle.as_non_null().cast();
646    // Memory region should be page-aligned, which we assume to be larger than any alignment
647    // we might encounter. If this is not the case, bail out.
648    if ptr.as_ptr().align_offset(std::mem::align_of::<T>()) != 0 {
649        thread_context(move |s| s.deallocate(handle));
650        return Err(AllocError::UnalignedMemory);
651    }
652    let actual_capacity = handle.len() / std::mem::size_of::<T>();
653    Ok((ptr, actual_capacity, handle))
654}
655
656/// Free the memory referenced by `handle`, which has been obtained from [`allocate`].
657///
658/// This function cannot fail. The caller must not access the memory after freeing it. The caller is responsible
659/// for dropping/forgetting data.
660///
661/// # Panics
662///
663/// Panics if the thread local variable has been dropped, see [`std::thread::LocalKey`]
664/// for details.
665pub fn deallocate(handle: Handle) {
666    if handle.is_dangling() {
667        return;
668    }
669    thread_context(|s| s.deallocate(handle));
670}
671
672/// A background worker that performs periodic tasks.
673struct BackgroundWorker {
674    config: BackgroundWorkerConfig,
675    receiver: Receiver<BackgroundWorkerConfig>,
676    global_stealer: &'static GlobalStealer,
677    worker: Worker<Handle>,
678}
679
680impl BackgroundWorker {
681    fn new(receiver: Receiver<BackgroundWorkerConfig>) -> Self {
682        let config = BackgroundWorkerConfig {
683            interval: Duration::MAX,
684            ..Default::default()
685        };
686        let global_stealer = GlobalStealer::get_static();
687        let worker = Worker::new_fifo();
688        Self {
689            config,
690            receiver,
691            global_stealer,
692            worker,
693        }
694    }
695
696    fn run(&mut self) {
697        let mut next_cleanup: Option<Instant> = None;
698        loop {
699            let timeout = next_cleanup.map_or(Duration::MAX, |next_cleanup| {
700                next_cleanup.saturating_duration_since(Instant::now())
701            });
702            match self.receiver.recv_timeout(timeout) {
703                Ok(config) => {
704                    self.config = config;
705                    next_cleanup = None;
706                }
707                Err(RecvTimeoutError::Disconnected) => break,
708                Err(RecvTimeoutError::Timeout) => {
709                    self.maintenance();
710                }
711            }
712            next_cleanup = next_cleanup
713                .unwrap_or_else(Instant::now)
714                .checked_add(self.config.interval);
715        }
716    }
717
718    fn maintenance(&self) {
719        for (index, size_class_state) in self.global_stealer.size_classes.iter().enumerate() {
720            let size_class = SizeClass::from_index(index);
721            let count = self.clear(size_class, size_class_state, &self.worker);
722            size_class_state
723                .alloc_stats
724                .clear_slow
725                .fetch_add(count.try_into().expect("must fit"), Ordering::Relaxed);
726        }
727    }
728
729    fn clear(
730        &self,
731        size_class: SizeClass,
732        state: &SizeClassState,
733        worker: &Worker<Handle>,
734    ) -> usize {
735        // Clear batch size, and at least one element.
736        let byte_size = size_class.byte_size();
737        let mut limit = (self.config.clear_bytes + byte_size - 1) / byte_size;
738        let mut count = 0;
739        let mut steal = Steal::Retry;
740        while limit > 0 && !steal.is_empty() {
741            steal = std::iter::repeat_with(|| state.injector.steal_batch_with_limit(worker, limit))
742                .find(|s| !s.is_retry())
743                .unwrap_or(Steal::Empty);
744            while let Some(mut mem) = worker.pop() {
745                match mem.clear() {
746                    Ok(()) => count += 1,
747                    Err(e) => panic!("Syscall failed: {e:?}"),
748                }
749                state.clean_injector.push(mem);
750                limit -= 1;
751            }
752        }
753        count
754    }
755}
756
757/// Set or update the configuration for lgalloc.
758///
759/// The function accepts a configuration, which is then applied on lgalloc. It allows clients to
760/// change the path where area files reside, and change the configuration of the background task.
761///
762/// Updating the area path only applies to new allocations, existing allocations are not moved to
763/// the new path.
764///
765/// Updating the background thread configuration eventually applies the new configuration on the
766/// running thread, or starts the background worker.
767///
768/// # Panics
769///
770/// Panics if the internal state of lgalloc is corrupted.
771pub fn lgalloc_set_config(config: &LgAlloc) {
772    let stealer = GlobalStealer::get_static();
773
774    if let Some(enabled) = &config.enabled {
775        LGALLOC_ENABLED.store(*enabled, Ordering::Relaxed);
776    }
777
778    if let Some(eager_return) = &config.eager_return {
779        LGALLOC_EAGER_RETURN.store(*eager_return, Ordering::Relaxed);
780    }
781
782    if let Some(path) = &config.path {
783        *stealer.path.write().expect("lock poisoned") = Some(path.clone());
784    }
785
786    if let Some(file_growth_dampener) = &config.file_growth_dampener {
787        LGALLOC_FILE_GROWTH_DAMPENER.store(*file_growth_dampener, Ordering::Relaxed);
788    }
789
790    if let Some(local_buffer_bytes) = &config.local_buffer_bytes {
791        LOCAL_BUFFER_BYTES.store(*local_buffer_bytes, Ordering::Relaxed);
792    }
793
794    if let Some(config) = config.background_config.clone() {
795        let mut lock = stealer.background_sender.lock().expect("lock poisoned");
796
797        let config = if let Some((_, sender)) = &*lock {
798            match sender.send(config) {
799                Ok(()) => None,
800                Err(err) => Some(err.0),
801            }
802        } else {
803            Some(config)
804        };
805        if let Some(config) = config {
806            let (sender, receiver) = std::sync::mpsc::channel();
807            let mut worker = BackgroundWorker::new(receiver);
808            let join_handle = std::thread::Builder::new()
809                .name("lgalloc-0".to_string())
810                .spawn(move || worker.run())
811                .expect("thread started successfully");
812            sender.send(config).expect("Receiver exists");
813            *lock = Some((join_handle, sender));
814        }
815    }
816}
817
818/// Configuration for lgalloc's background worker.
819#[derive(Default, Debug, Clone, Eq, PartialEq)]
820pub struct BackgroundWorkerConfig {
821    /// How frequently it should tick
822    pub interval: Duration,
823    /// How many bytes to clear per size class.
824    pub clear_bytes: usize,
825}
826
827/// Lgalloc configuration
828#[derive(Default, Clone, Eq, PartialEq)]
829pub struct LgAlloc {
830    /// Whether the allocator is enabled or not.
831    pub enabled: Option<bool>,
832    /// Path where files reside.
833    pub path: Option<PathBuf>,
834    /// Configuration of the background worker.
835    pub background_config: Option<BackgroundWorkerConfig>,
836    /// Whether to return physical memory on deallocate
837    pub eager_return: Option<bool>,
838    /// Dampener in the file growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
839    pub file_growth_dampener: Option<usize>,
840    /// Size of the per-thread per-size class cache, in bytes.
841    pub local_buffer_bytes: Option<usize>,
842}
843
844impl LgAlloc {
845    /// Construct a new configuration. All values are initialized to their default (None) values.
846    #[must_use]
847    pub fn new() -> Self {
848        Self::default()
849    }
850
851    /// Enable lgalloc globally.
852    pub fn enable(&mut self) -> &mut Self {
853        self.enabled = Some(true);
854        self
855    }
856
857    /// Disable lgalloc globally.
858    pub fn disable(&mut self) -> &mut Self {
859        self.enabled = Some(false);
860        self
861    }
862
863    /// Set the background worker configuration.
864    pub fn with_background_config(&mut self, config: BackgroundWorkerConfig) -> &mut Self {
865        self.background_config = Some(config);
866        self
867    }
868
869    /// Set the area file path.
870    pub fn with_path(&mut self, path: PathBuf) -> &mut Self {
871        self.path = Some(path);
872        self
873    }
874
875    /// Enable eager memory reclamation.
876    pub fn eager_return(&mut self, eager_return: bool) -> &mut Self {
877        self.eager_return = Some(eager_return);
878        self
879    }
880
881    /// Set the file growth dampener.
882    pub fn file_growth_dampener(&mut self, file_growth_dapener: usize) -> &mut Self {
883        self.file_growth_dampener = Some(file_growth_dapener);
884        self
885    }
886
887    /// Set the local buffer size.
888    pub fn local_buffer_bytes(&mut self, local_buffer_bytes: usize) -> &mut Self {
889        self.local_buffer_bytes = Some(local_buffer_bytes);
890        self
891    }
892}
893
894/// Determine global statistics per size class.
895///
896/// This function is supposed to be relatively fast. It causes some syscalls, but they
897/// should be cheap (stat on a file descriptor).
898///
899/// Note that this function take a read lock on various structures. It calls `fstat` while
900/// holding a read lock on portions of the global state, which can block refills until the
901/// function returns.
902///
903/// # Panics
904///
905/// Panics if the internal state of lgalloc is corrupted.
906pub fn lgalloc_stats() -> LgAllocStats {
907    LgAllocStats::read(None)
908}
909
910/// Determine global statistics per size class, and include mapping information.
911///
912/// This function can be very slow as it needs to read the `numa_maps` file. Depending
913/// on the heap size of the program, this can take seconds to minutes, so call this
914/// function with care.
915///
916/// Note that this function take a read lock on various structures. In addition to the locks
917/// described on [`lgalloc_stats`], this function reads the `/proc/self/numa_maps` file without
918/// holding any locks, but the kernel might block other memory operations while reading this file.
919///
920/// # Panics
921///
922/// Panics if the internal state of lgalloc is corrupted.
923pub fn lgalloc_stats_with_mapping() -> std::io::Result<LgAllocStats> {
924    let mut numa_map = NumaMap::from_file("/proc/self/numa_maps")?;
925    Ok(LgAllocStats::read(Some(&mut numa_map)))
926}
927
928/// Statistics about lgalloc's internal behavior.
929#[derive(Debug)]
930pub struct LgAllocStats {
931    /// Per size-class statistics.
932    pub size_class: Vec<(usize, SizeClassStats)>,
933    /// Per size-class and backing file statistics. Each entry identifies the
934    /// size class it describes, and there can be multiple entries for each size class.
935    pub file: Vec<(usize, std::io::Result<FileStats>)>,
936    /// Per size-class and map statistics. Each entry identifies the
937    /// size class it describes, and there can be multiple entries for each size class.
938    pub map: Option<Vec<(usize, MapStats)>>,
939}
940
941impl LgAllocStats {
942    /// Read lgalloc statistics.
943    ///
944    /// Supply a `numa_map` to obtain mapping stats.
945    fn read(mut numa_map: Option<&mut NumaMap>) -> Self {
946        let global = GlobalStealer::get_static();
947
948        if let Some(numa_map) = numa_map.as_mut() {
949            // Normalize numa_maps, and sort by address.
950            for entry in &mut numa_map.ranges {
951                entry.normalize();
952            }
953            numa_map.ranges.sort();
954        }
955
956        let mut size_class_stats = Vec::with_capacity(VALID_SIZE_CLASS.len());
957        let mut file_stats = Vec::default();
958        let mut map_stats = Vec::default();
959        for (index, state) in global.size_classes.iter().enumerate() {
960            let size_class = SizeClass::from_index(index);
961            let size_class_bytes = size_class.byte_size();
962
963            size_class_stats.push((size_class_bytes, SizeClassStats::from(state)));
964
965            let areas = state.areas.read().expect("lock poisoned");
966            for (file, mmap) in areas.iter().map(Deref::deref) {
967                file_stats.push((size_class_bytes, FileStats::extract_from(file)));
968                if let Some(numa_map) = numa_map.as_deref() {
969                    map_stats.push((size_class_bytes, MapStats::extract_from(mmap, numa_map)));
970                }
971            }
972        }
973
974        Self {
975            size_class: size_class_stats,
976            file: file_stats,
977            map: numa_map.map(|_| map_stats),
978        }
979    }
980}
981
982/// Statistics per size class.
983#[derive(Debug)]
984pub struct SizeClassStats {
985    /// Number of areas backing a size class.
986    pub areas: usize,
987    /// Total number of bytes summed across all areas.
988    pub area_total_bytes: usize,
989    /// Free regions
990    pub free_regions: usize,
991    /// Clean free regions in the global allocator
992    pub clean_regions: usize,
993    /// Regions in the global allocator
994    pub global_regions: usize,
995    /// Regions retained in thread-local allocators
996    pub thread_regions: usize,
997    /// Total allocations
998    pub allocations: u64,
999    /// Total slow-path allocations (globally out of memory)
1000    pub slow_path: u64,
1001    /// Total refills
1002    pub refill: u64,
1003    /// Total deallocations
1004    pub deallocations: u64,
1005    /// Total times memory has been returned to the OS (eager reclamation) in regions.
1006    pub clear_eager_total: u64,
1007    /// Total times memory has been returned to the OS (slow reclamation) in regions.
1008    pub clear_slow_total: u64,
1009}
1010
1011impl From<&SizeClassState> for SizeClassStats {
1012    fn from(size_class_state: &SizeClassState) -> Self {
1013        let areas = size_class_state.area_count.load(Ordering::Relaxed);
1014        let area_total_bytes = size_class_state.total_bytes.load(Ordering::Relaxed);
1015        let global_regions = size_class_state.injector.len();
1016        let clean_regions = size_class_state.clean_injector.len();
1017        let stealers = size_class_state.stealers.read().expect("lock poisoned");
1018        let mut thread_regions = 0;
1019        let mut allocations = 0;
1020        let mut deallocations = 0;
1021        let mut refill = 0;
1022        let mut slow_path = 0;
1023        let mut clear_eager_total = 0;
1024        let mut clear_slow_total = 0;
1025        for thread_state in stealers.values() {
1026            thread_regions += thread_state.stealer.len();
1027            let thread_stats = &*thread_state.alloc_stats;
1028            allocations += thread_stats.allocations.load(Ordering::Relaxed);
1029            deallocations += thread_stats.deallocations.load(Ordering::Relaxed);
1030            refill += thread_stats.refill.load(Ordering::Relaxed);
1031            slow_path += thread_stats.slow_path.load(Ordering::Relaxed);
1032            clear_eager_total += thread_stats.clear_eager.load(Ordering::Relaxed);
1033            clear_slow_total += thread_stats.clear_slow.load(Ordering::Relaxed);
1034        }
1035
1036        let free_regions = thread_regions + global_regions + clean_regions;
1037
1038        let global_stats = &size_class_state.alloc_stats;
1039        allocations += global_stats.allocations.load(Ordering::Relaxed);
1040        deallocations += global_stats.deallocations.load(Ordering::Relaxed);
1041        refill += global_stats.refill.load(Ordering::Relaxed);
1042        slow_path += global_stats.slow_path.load(Ordering::Relaxed);
1043        clear_eager_total += global_stats.clear_eager.load(Ordering::Relaxed);
1044        clear_slow_total += global_stats.clear_slow.load(Ordering::Relaxed);
1045        Self {
1046            areas,
1047            area_total_bytes,
1048            free_regions,
1049            global_regions,
1050            clean_regions,
1051            thread_regions,
1052            allocations,
1053            deallocations,
1054            refill,
1055            slow_path,
1056            clear_eager_total,
1057            clear_slow_total,
1058        }
1059    }
1060}
1061
1062/// Statistics per size class and backing file.
1063#[derive(Debug)]
1064pub struct FileStats {
1065    /// The size of the file in bytes.
1066    pub file_size: usize,
1067    /// Size of the file on disk in bytes.
1068    pub allocated_size: usize,
1069}
1070
1071impl FileStats {
1072    /// Extract file statistics from a file descriptor. Calls `fstat` on the file to obtain
1073    /// the size and allocated size.
1074    fn extract_from(file: &File) -> std::io::Result<Self> {
1075        let mut stat: MaybeUninit<libc::stat> = MaybeUninit::uninit();
1076        // SAFETY: File descriptor valid, stat object valid.
1077        let ret = unsafe { libc::fstat(file.as_raw_fd(), stat.as_mut_ptr()) };
1078        if ret == -1 {
1079            Err(std::io::Error::last_os_error())
1080        } else {
1081            // SAFETY: `stat` is initialized in the fstat non-error case.
1082            let stat = unsafe { stat.assume_init_ref() };
1083            let blocks = stat.st_blocks.try_into().unwrap_or(0);
1084            let file_size = stat.st_size.try_into().unwrap_or(0);
1085            Ok(FileStats {
1086                file_size,
1087                // Documented as multiples of 512
1088                allocated_size: blocks * 512,
1089            })
1090        }
1091    }
1092}
1093
1094/// Statistics per size class and mapping.
1095#[derive(Debug)]
1096pub struct MapStats {
1097    /// Number of mapped bytes, if different from `dirty`. Consult `man 7 numa` for details.
1098    pub mapped: usize,
1099    /// Number of active bytes. Consult `man 7 numa` for details.
1100    pub active: usize,
1101    /// Number of dirty bytes. Consult `man 7 numa` for details.
1102    pub dirty: usize,
1103}
1104
1105impl MapStats {
1106    /// Extract memory map stats for `mmap` based on `numa_map`.
1107    ///
1108    /// The ranges of in the numa map file must be sorted by address and normalized.
1109    fn extract_from(mmap: &MmapMut, numa_map: &NumaMap) -> Self {
1110        // TODO: Use `addr` once our MSRV is 1.84.
1111        let base = mmap.as_ptr().cast::<()>() as usize;
1112        let range = match numa_map
1113            .ranges
1114            .binary_search_by(|range| range.address.cmp(&base))
1115        {
1116            Ok(pos) => Some(&numa_map.ranges[pos]),
1117            // `numa_maps` only updates periodically, so we might be missing some
1118            // expected ranges.
1119            Err(_pos) => None,
1120        };
1121
1122        let mut mapped = 0;
1123        let mut active = 0;
1124        let mut dirty = 0;
1125        for property in range.iter().flat_map(|e| e.properties.iter()) {
1126            match property {
1127                numa_maps::Property::Dirty(d) => dirty = *d,
1128                numa_maps::Property::Mapped(m) => mapped = *m,
1129                numa_maps::Property::Active(a) => active = *a,
1130                _ => {}
1131            }
1132        }
1133
1134        Self {
1135            mapped,
1136            active,
1137            dirty,
1138        }
1139    }
1140}
1141
1142#[cfg(test)]
1143mod test {
1144    use std::mem::{ManuallyDrop, MaybeUninit};
1145    use std::ptr::NonNull;
1146    use std::sync::atomic::{AtomicBool, Ordering};
1147    use std::sync::Arc;
1148    use std::time::Duration;
1149
1150    use serial_test::serial;
1151
1152    use super::*;
1153
1154    fn initialize() {
1155        lgalloc_set_config(
1156            LgAlloc::new()
1157                .enable()
1158                .with_background_config(BackgroundWorkerConfig {
1159                    interval: Duration::from_secs(1),
1160                    clear_bytes: 4 << 20,
1161                })
1162                .with_path(std::env::temp_dir())
1163                .file_growth_dampener(1),
1164        );
1165    }
1166
1167    struct Wrapper<T> {
1168        handle: MaybeUninit<Handle>,
1169        ptr: NonNull<MaybeUninit<T>>,
1170        cap: usize,
1171    }
1172
1173    unsafe impl<T: Send> Send for Wrapper<T> {}
1174    unsafe impl<T: Sync> Sync for Wrapper<T> {}
1175
1176    impl<T> Wrapper<T> {
1177        fn allocate(capacity: usize) -> Result<Self, AllocError> {
1178            let (ptr, cap, handle) = allocate(capacity)?;
1179            assert!(cap > 0);
1180            let handle = MaybeUninit::new(handle);
1181            Ok(Self { ptr, cap, handle })
1182        }
1183
1184        fn as_slice(&mut self) -> &mut [MaybeUninit<T>] {
1185            unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.cap) }
1186        }
1187    }
1188
1189    impl<T> Drop for Wrapper<T> {
1190        fn drop(&mut self) {
1191            unsafe { deallocate(self.handle.assume_init_read()) };
1192        }
1193    }
1194
1195    #[test]
1196    #[serial]
1197    fn test_readme() -> Result<(), AllocError> {
1198        initialize();
1199
1200        // Allocate memory
1201        let (ptr, cap, handle) = allocate::<u8>(2 << 20)?;
1202        // SAFETY: `allocate` returns a valid memory region and errors otherwise.
1203        let mut vec = ManuallyDrop::new(unsafe { Vec::from_raw_parts(ptr.as_ptr(), 0, cap) });
1204
1205        // Write into region, make sure not to reallocate vector.
1206        vec.extend_from_slice(&[1, 2, 3, 4]);
1207
1208        // We can read from the vector.
1209        assert_eq!(&*vec, &[1, 2, 3, 4]);
1210
1211        // Deallocate after use
1212        deallocate(handle);
1213        Ok(())
1214    }
1215
1216    #[test]
1217    #[serial]
1218    fn test_1() -> Result<(), AllocError> {
1219        initialize();
1220        <Wrapper<u8>>::allocate(4 << 20)?.as_slice()[0] = MaybeUninit::new(1);
1221        Ok(())
1222    }
1223
1224    #[test]
1225    #[serial]
1226    fn test_3() -> Result<(), AllocError> {
1227        initialize();
1228        let until = Arc::new(AtomicBool::new(true));
1229
1230        let inner = || {
1231            let until = Arc::clone(&until);
1232            move || {
1233                let mut i = 0;
1234                let until = &*until;
1235                while until.load(Ordering::Relaxed) {
1236                    i += 1;
1237                    let mut r = <Wrapper<u8>>::allocate(4 << 20).unwrap();
1238                    r.as_slice()[0] = MaybeUninit::new(1);
1239                }
1240                println!("repetitions: {i}");
1241            }
1242        };
1243        let handles = [
1244            std::thread::spawn(inner()),
1245            std::thread::spawn(inner()),
1246            std::thread::spawn(inner()),
1247            std::thread::spawn(inner()),
1248        ];
1249        std::thread::sleep(Duration::from_secs(4));
1250        until.store(false, Ordering::Relaxed);
1251        for handle in handles {
1252            handle.join().unwrap();
1253        }
1254        // std::thread::sleep(Duration::from_secs(600));
1255        Ok(())
1256    }
1257
1258    #[test]
1259    #[serial]
1260    fn test_4() -> Result<(), AllocError> {
1261        initialize();
1262        let until = Arc::new(AtomicBool::new(true));
1263
1264        let inner = || {
1265            let until = Arc::clone(&until);
1266            move || {
1267                let mut i = 0;
1268                let until = &*until;
1269                let batch = 64;
1270                let mut buffer = Vec::with_capacity(batch);
1271                while until.load(Ordering::Relaxed) {
1272                    i += 64;
1273                    buffer.extend((0..batch).map(|_| {
1274                        let mut r = <Wrapper<u8>>::allocate(2 << 20).unwrap();
1275                        r.as_slice()[0] = MaybeUninit::new(1);
1276                        r
1277                    }));
1278                    buffer.clear();
1279                }
1280                println!("repetitions vec: {i}");
1281            }
1282        };
1283        let handles = [
1284            std::thread::spawn(inner()),
1285            std::thread::spawn(inner()),
1286            std::thread::spawn(inner()),
1287            std::thread::spawn(inner()),
1288        ];
1289        std::thread::sleep(Duration::from_secs(4));
1290        until.store(false, Ordering::Relaxed);
1291        for handle in handles {
1292            handle.join().unwrap();
1293        }
1294        std::thread::sleep(Duration::from_secs(1));
1295        let stats = lgalloc_stats();
1296        for size_class in &stats.size_class {
1297            println!("size_class {:?}", size_class);
1298        }
1299        for (size_class, file_stats) in &stats.file {
1300            match file_stats {
1301                Ok(file_stats) => println!("file_stats {size_class} {file_stats:?}"),
1302                Err(e) => eprintln!("error: {e}"),
1303            }
1304        }
1305        Ok(())
1306    }
1307
1308    #[test]
1309    #[serial]
1310    fn leak() -> Result<(), AllocError> {
1311        lgalloc_set_config(&LgAlloc {
1312            enabled: Some(true),
1313            path: Some(std::env::temp_dir()),
1314            ..Default::default()
1315        });
1316        let r = <Wrapper<u8>>::allocate(1000)?;
1317
1318        let thread = std::thread::spawn(move || drop(r));
1319
1320        thread.join().unwrap();
1321        Ok(())
1322    }
1323
1324    #[test]
1325    #[serial]
1326    fn test_zst() -> Result<(), AllocError> {
1327        initialize();
1328        <Wrapper<()>>::allocate(10)?;
1329        Ok(())
1330    }
1331
1332    #[test]
1333    #[serial]
1334    fn test_zero_capacity_zst() -> Result<(), AllocError> {
1335        initialize();
1336        <Wrapper<()>>::allocate(0)?;
1337        Ok(())
1338    }
1339
1340    #[test]
1341    #[serial]
1342    fn test_zero_capacity_nonzst() -> Result<(), AllocError> {
1343        initialize();
1344        <Wrapper<()>>::allocate(0)?;
1345        Ok(())
1346    }
1347
1348    #[test]
1349    #[serial]
1350    fn test_stats() -> Result<(), AllocError> {
1351        initialize();
1352        let (_ptr, _cap, handle) = allocate::<usize>(1024)?;
1353        deallocate(handle);
1354
1355        let stats = lgalloc_stats();
1356
1357        assert!(!stats.size_class.is_empty());
1358
1359        Ok(())
1360    }
1361
1362    #[test]
1363    #[serial]
1364    fn test_disable() {
1365        lgalloc_set_config(&*LgAlloc::new().disable());
1366        assert!(matches!(allocate::<u8>(1024), Err(AllocError::Disabled)));
1367    }
1368}