lgalloc/
lib.rs

1//! A size-classed file-backed large object allocator.
2//!
3//! This library contains types to allocate memory outside the heap,
4//! supporting power-of-two object sizes. Each size class has its own
5//! memory pool.
6//!
7//! # Safety
8//!
9//! This library is very unsafe on account of `unsafe` and interacting directly
10//! with libc, including Linux extension.
11//!
12//! The library relies on memory-mapped files. Users of this file must not fork the process
13//! because otherwise two processes would share the same mappings, causing undefined behavior
14//! because the mutable pointers would not be unique anymore. Unfortunately, there is no way
15//! to tell the memory subsystem that the shared mappings must not be inherited.
16//!
17//! Clients must not lock pages (`mlock`), or need to unlock the pages before returning them
18//! to lgalloc.
19
20#![deny(missing_docs)]
21
22use std::cell::RefCell;
23use std::collections::HashMap;
24use std::fs::File;
25use std::marker::PhantomData;
26use std::mem::{take, ManuallyDrop, MaybeUninit};
27use std::ops::{Deref, Range};
28use std::os::fd::{AsFd, AsRawFd};
29use std::path::PathBuf;
30use std::ptr::NonNull;
31use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
32use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
33use std::sync::{Arc, Mutex, OnceLock, RwLock};
34use std::thread::{JoinHandle, ThreadId};
35use std::time::{Duration, Instant};
36
37use crossbeam_deque::{Injector, Steal, Stealer, Worker};
38use memmap2::MmapMut;
39use numa_maps::NumaMap;
40use thiserror::Error;
41
42mod readme {
43    #![doc = include_str!("../README.md")]
44}
45
46/// Handle to describe allocations.
47///
48/// Handles represent a leased allocation, which must be explicitly freed. Otherwise, the caller will permanently leak
49/// the associated memory.
50pub struct Handle {
51    /// The actual pointer.
52    ptr: NonNull<u8>,
53    /// Length of the allocation.
54    len: usize,
55}
56
57unsafe impl Send for Handle {}
58unsafe impl Sync for Handle {}
59
60#[allow(clippy::len_without_is_empty)]
61impl Handle {
62    /// Construct a new handle from a region of memory
63    fn new(ptr: NonNull<u8>, len: usize) -> Self {
64        Self { ptr, len }
65    }
66
67    /// Construct a dangling handle, which is only suitable for zero-sized types.
68    fn dangling() -> Self {
69        Self {
70            ptr: NonNull::dangling(),
71            len: 0,
72        }
73    }
74
75    fn is_dangling(&self) -> bool {
76        self.ptr == NonNull::dangling()
77    }
78
79    /// Length of the memory area in bytes.
80    fn len(&self) -> usize {
81        self.len
82    }
83
84    /// Pointer to memory.
85    fn as_non_null(&self) -> NonNull<u8> {
86        self.ptr
87    }
88
89    /// Indicate that the memory is not in use and that the OS can recycle it.
90    fn clear(&mut self) -> std::io::Result<()> {
91        // SAFETY: `MADV_DONTNEED_STRATEGY` guaranteed to be a valid argument.
92        unsafe { self.madvise(MADV_DONTNEED_STRATEGY) }
93    }
94
95    /// Indicate that the memory is not in use and that the OS can recycle it.
96    fn fast_clear(&mut self) -> std::io::Result<()> {
97        // SAFETY: `libc::MADV_DONTNEED` documented to be a valid argument.
98        unsafe { self.madvise(libc::MADV_DONTNEED) }
99    }
100
101    /// Call `madvise` on the memory region. Unsafe because `advice` is passed verbatim.
102    unsafe fn madvise(&self, advice: libc::c_int) -> std::io::Result<()> {
103        // SAFETY: Calling into `madvise`:
104        // * The ptr is page-aligned by construction.
105        // * The ptr + length is page-aligned by construction (not required but surprising otherwise)
106        // * Mapped shared and writable (for MADV_REMOVE),
107        // * Pages not locked.
108        // * The caller is responsible for passing a valid `advice` parameter.
109        let ptr = self.as_non_null().as_ptr().cast();
110        let ret = unsafe { libc::madvise(ptr, self.len, advice) };
111        if ret != 0 {
112            let err = std::io::Error::last_os_error();
113            eprintln!("madvise failed: {ret} {err:?}",);
114            return Err(err);
115        }
116        Ok(())
117    }
118}
119
120/// Initial file size
121const INITIAL_SIZE: usize = 32 << 20;
122
123/// Range of valid size classes.
124pub const VALID_SIZE_CLASS: Range<usize> = 10..37;
125
126/// Strategy to indicate that the OS can reclaim pages
127// TODO: On Linux, we want to use MADV_REMOVE, but that's only supported
128// on file systems that supports FALLOC_FL_PUNCH_HOLE. We should check
129// the return value and retry EOPNOTSUPP with MADV_DONTNEED.
130#[cfg(target_os = "linux")]
131const MADV_DONTNEED_STRATEGY: libc::c_int = libc::MADV_REMOVE;
132
133#[cfg(not(target_os = "linux"))]
134const MADV_DONTNEED_STRATEGY: libc::c_int = libc::MADV_DONTNEED;
135
136type PhantomUnsyncUnsend<T> = PhantomData<*mut T>;
137
138/// Allocation errors
139#[derive(Error, Debug)]
140pub enum AllocError {
141    /// IO error, unrecoverable
142    #[error("I/O error")]
143    Io(#[from] std::io::Error),
144    /// Out of memory, meaning that the pool is exhausted.
145    #[error("Out of memory")]
146    OutOfMemory,
147    /// Size class too large or small
148    #[error("Invalid size class")]
149    InvalidSizeClass(usize),
150    /// Allocator disabled
151    #[error("Disabled by configuration")]
152    Disabled,
153    /// Failed to allocate memory that suits alignment properties.
154    #[error("Memory unsuitable for requested alignment")]
155    UnalignedMemory,
156}
157
158impl AllocError {
159    /// Check if this error is [`AllocError::Disabled`].
160    #[must_use]
161    pub fn is_disabled(&self) -> bool {
162        matches!(self, AllocError::Disabled)
163    }
164}
165
166/// Abstraction over size classes.
167#[derive(Clone, Copy)]
168struct SizeClass(usize);
169
170impl SizeClass {
171    const fn new_unchecked(value: usize) -> Self {
172        Self(value)
173    }
174
175    const fn index(self) -> usize {
176        self.0 - VALID_SIZE_CLASS.start
177    }
178
179    /// The size in bytes of this size class.
180    const fn byte_size(self) -> usize {
181        1 << self.0
182    }
183
184    const fn from_index(index: usize) -> Self {
185        Self(index + VALID_SIZE_CLASS.start)
186    }
187
188    /// Obtain a size class from a size in bytes.
189    fn from_byte_size(byte_size: usize) -> Result<Self, AllocError> {
190        let class = byte_size.next_power_of_two().trailing_zeros() as usize;
191        class.try_into()
192    }
193
194    const fn from_byte_size_unchecked(byte_size: usize) -> Self {
195        Self::new_unchecked(byte_size.next_power_of_two().trailing_zeros() as usize)
196    }
197}
198
199impl TryFrom<usize> for SizeClass {
200    type Error = AllocError;
201
202    fn try_from(value: usize) -> Result<Self, Self::Error> {
203        if VALID_SIZE_CLASS.contains(&value) {
204            Ok(SizeClass(value))
205        } else {
206            Err(AllocError::InvalidSizeClass(value))
207        }
208    }
209}
210
211#[derive(Default, Debug)]
212struct AllocStats {
213    allocations: AtomicU64,
214    slow_path: AtomicU64,
215    refill: AtomicU64,
216    deallocations: AtomicU64,
217    clear_eager: AtomicU64,
218    clear_slow: AtomicU64,
219}
220
221/// Handle to the shared global state.
222static INJECTOR: OnceLock<GlobalStealer> = OnceLock::new();
223
224/// Enabled switch to turn on or off lgalloc. Off by default.
225static LGALLOC_ENABLED: AtomicBool = AtomicBool::new(false);
226
227/// Enable eager returning of memory. Off by default.
228static LGALLOC_EAGER_RETURN: AtomicBool = AtomicBool::new(false);
229
230/// Dampener in the file growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
231///
232/// Setting this to 0 results in creating files with doubling capacity.
233/// Larger numbers result in more conservative approaches that create more files.
234static LGALLOC_FILE_GROWTH_DAMPENER: AtomicUsize = AtomicUsize::new(0);
235
236/// The size of allocations to retain locally, per thread and size class.
237static LOCAL_BUFFER_BYTES: AtomicUsize = AtomicUsize::new(32 << 20);
238
239/// Type maintaining the global state for each size class.
240struct GlobalStealer {
241    /// State for each size class. An entry at position `x` handle size class `x`, which is areas
242    /// of size `1<<x`.
243    size_classes: Vec<SizeClassState>,
244    /// Path to store files
245    path: RwLock<Option<PathBuf>>,
246    /// Shared token to access background thread.
247    background_sender: Mutex<Option<(JoinHandle<()>, Sender<BackgroundWorkerConfig>)>>,
248}
249
250/// Per-size-class state
251#[derive(Default)]
252struct SizeClassState {
253    /// Handle to memory-mapped regions.
254    ///
255    /// We must never dereference the memory-mapped regions stored here.
256    areas: RwLock<Vec<ManuallyDrop<(File, MmapMut)>>>,
257    /// Injector to distribute memory globally.
258    injector: Injector<Handle>,
259    /// Injector to distribute memory globally, freed memory.
260    clean_injector: Injector<Handle>,
261    /// Slow-path lock to refill pool.
262    lock: Mutex<()>,
263    /// Thread stealers to allow all participating threads to steal memory.
264    stealers: RwLock<HashMap<ThreadId, PerThreadState<Handle>>>,
265    /// Summed stats for terminated threads.
266    alloc_stats: AllocStats,
267}
268
269impl GlobalStealer {
270    /// Obtain the shared global state.
271    fn get_static() -> &'static Self {
272        INJECTOR.get_or_init(Self::new)
273    }
274
275    /// Obtain the per-size-class global state.
276    fn get_size_class(&self, size_class: SizeClass) -> &SizeClassState {
277        &self.size_classes[size_class.index()]
278    }
279
280    fn new() -> Self {
281        let mut size_classes = Vec::with_capacity(VALID_SIZE_CLASS.len());
282
283        for _ in VALID_SIZE_CLASS {
284            size_classes.push(SizeClassState::default());
285        }
286
287        Self {
288            size_classes,
289            path: RwLock::default(),
290            background_sender: Mutex::default(),
291        }
292    }
293}
294
295impl Drop for GlobalStealer {
296    fn drop(&mut self) {
297        take(&mut self.size_classes);
298    }
299}
300
301struct PerThreadState<T> {
302    stealer: Stealer<T>,
303    alloc_stats: Arc<AllocStats>,
304}
305
306/// Per-thread and state, sharded by size class.
307struct ThreadLocalStealer {
308    /// Per-size-class state
309    size_classes: Vec<LocalSizeClass>,
310    _phantom: PhantomUnsyncUnsend<Self>,
311}
312
313impl ThreadLocalStealer {
314    fn new() -> Self {
315        let thread_id = std::thread::current().id();
316        let size_classes = VALID_SIZE_CLASS
317            .map(|size_class| LocalSizeClass::new(SizeClass::new_unchecked(size_class), thread_id))
318            .collect();
319        Self {
320            size_classes,
321            _phantom: PhantomData,
322        }
323    }
324
325    /// Allocate a memory region from a specific size class.
326    ///
327    /// Returns [`AllocError::Disabled`] if lgalloc is not enabled. Returns other error types
328    /// if out of memory, or an internal operation fails.
329    fn allocate(&self, size_class: SizeClass) -> Result<Handle, AllocError> {
330        if !LGALLOC_ENABLED.load(Ordering::Relaxed) {
331            return Err(AllocError::Disabled);
332        }
333        self.size_classes[size_class.index()].get_with_refill()
334    }
335
336    /// Return memory to the allocator. Must have been obtained through [`allocate`].
337    fn deallocate(&self, mem: Handle) {
338        let size_class = SizeClass::from_byte_size_unchecked(mem.len());
339
340        self.size_classes[size_class.index()].push(mem);
341    }
342}
343
344thread_local! {
345    static WORKER: RefCell<ThreadLocalStealer> = RefCell::new(ThreadLocalStealer::new());
346}
347
348/// Per-thread, per-size-class state
349///
350/// # Safety
351///
352/// We store parts of areas in this struct. Leaking this struct leaks the areas, which is safe
353/// because we will never try to access or reclaim them.
354struct LocalSizeClass {
355    /// Local memory queue.
356    worker: Worker<Handle>,
357    /// Size class we're covering
358    size_class: SizeClass,
359    /// Handle to global size class state
360    size_class_state: &'static SizeClassState,
361    /// Owning thread's ID
362    thread_id: ThreadId,
363    /// Shared statistics maintained by this thread.
364    stats: Arc<AllocStats>,
365    /// Phantom data to prevent sending the type across thread boundaries.
366    _phantom: PhantomUnsyncUnsend<Self>,
367}
368
369impl LocalSizeClass {
370    /// Construct a new local size class state. Registers the worker with the global state.
371    fn new(size_class: SizeClass, thread_id: ThreadId) -> Self {
372        let worker = Worker::new_lifo();
373        let stealer = GlobalStealer::get_static();
374        let size_class_state = stealer.get_size_class(size_class);
375
376        let stats = Arc::new(AllocStats::default());
377
378        let mut lock = size_class_state.stealers.write().unwrap();
379        lock.insert(
380            thread_id,
381            PerThreadState {
382                stealer: worker.stealer(),
383                alloc_stats: Arc::clone(&stats),
384            },
385        );
386
387        Self {
388            worker,
389            size_class,
390            size_class_state,
391            thread_id,
392            stats,
393            _phantom: PhantomData,
394        }
395    }
396
397    /// Get a memory area. Tries to get a region from the local cache, before obtaining data from
398    /// the global state. As a last option, obtains memory from other workers.
399    ///
400    /// Returns [`AllcError::OutOfMemory`] if all pools are empty.
401    #[inline]
402    fn get(&self) -> Result<Handle, AllocError> {
403        self.worker
404            .pop()
405            .or_else(|| {
406                std::iter::repeat_with(|| {
407                    // The loop tries to obtain memory in the following order:
408                    // 1. Memory from the global state,
409                    // 2. Memory from the global cleaned state,
410                    // 3. Memory from other threads.
411                    let limit = 1.max(
412                        LOCAL_BUFFER_BYTES.load(Ordering::Relaxed)
413                            / self.size_class.byte_size()
414                            / 2,
415                    );
416
417                    self.size_class_state
418                        .injector
419                        .steal_batch_with_limit_and_pop(&self.worker, limit)
420                        .or_else(|| {
421                            self.size_class_state
422                                .clean_injector
423                                .steal_batch_with_limit_and_pop(&self.worker, limit)
424                        })
425                        .or_else(|| {
426                            self.size_class_state
427                                .stealers
428                                .read()
429                                .unwrap()
430                                .values()
431                                .map(|state| state.stealer.steal())
432                                .collect()
433                        })
434                })
435                .find(|s| !s.is_retry())
436                .and_then(Steal::success)
437            })
438            .ok_or(AllocError::OutOfMemory)
439    }
440
441    /// Like [`Self::get()`] but trying to refill the pool if it is empty.
442    fn get_with_refill(&self) -> Result<Handle, AllocError> {
443        self.stats.allocations.fetch_add(1, Ordering::Relaxed);
444        // Fast-path: Get non-blocking
445        match self.get() {
446            Err(AllocError::OutOfMemory) => {
447                self.stats.slow_path.fetch_add(1, Ordering::Relaxed);
448                // Get a slow-path lock
449                let _lock = self.size_class_state.lock.lock().unwrap();
450                // Try again because another thread might have refilled already
451                if let Ok(mem) = self.get() {
452                    return Ok(mem);
453                }
454                self.try_refill_and_get()
455            }
456            r => r,
457        }
458    }
459
460    /// Recycle memory. Stores it locally or forwards it to the global state.
461    fn push(&self, mut mem: Handle) {
462        debug_assert_eq!(mem.len(), self.size_class.byte_size());
463        self.stats.deallocations.fetch_add(1, Ordering::Relaxed);
464        if self.worker.len()
465            >= LOCAL_BUFFER_BYTES.load(Ordering::Relaxed) / self.size_class.byte_size()
466        {
467            if LGALLOC_EAGER_RETURN.load(Ordering::Relaxed) {
468                self.stats.clear_eager.fetch_add(1, Ordering::Relaxed);
469                mem.fast_clear().expect("clearing successful");
470            }
471            self.size_class_state.injector.push(mem);
472        } else {
473            self.worker.push(mem);
474        }
475    }
476
477    /// Refill the memory pool, and get one area.
478    ///
479    /// Returns an error if the memory pool cannot be refilled.
480    fn try_refill_and_get(&self) -> Result<Handle, AllocError> {
481        self.stats.refill.fetch_add(1, Ordering::Relaxed);
482        let mut stash = self.size_class_state.areas.write().unwrap();
483
484        let initial_capacity = std::cmp::max(1, INITIAL_SIZE / self.size_class.byte_size());
485
486        let last_capacity =
487            stash.iter().last().map_or(0, |mmap| mmap.1.len()) / self.size_class.byte_size();
488        let growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.load(Ordering::Relaxed);
489        // We would like to grow the file capacity by a factor of `1+1/(growth_dampener+1)`,
490        // but at least by `initial_capacity`.
491        let next_capacity = last_capacity
492            + std::cmp::max(
493                initial_capacity,
494                last_capacity / (growth_dampener.saturating_add(1)),
495            );
496
497        let next_byte_len = next_capacity * self.size_class.byte_size();
498        let (file, mut mmap) = Self::init_file(next_byte_len)?;
499
500        // SAFETY: Memory region initialized, so pointers to it are valid.
501        let mut chunks = mmap
502            .as_mut()
503            .chunks_exact_mut(self.size_class.byte_size())
504            .map(|chunk| unsafe { NonNull::new_unchecked(chunk.as_mut_ptr()) });
505
506        // Capture first region to return immediately.
507        let ptr = chunks.next().expect("At least once chunk allocated.");
508        let mem = Handle::new(ptr, self.size_class.byte_size());
509
510        // Stash remaining in the injector.
511        for ptr in chunks {
512            self.size_class_state
513                .clean_injector
514                .push(Handle::new(ptr, self.size_class.byte_size()));
515        }
516
517        stash.push(ManuallyDrop::new((file, mmap)));
518        Ok(mem)
519    }
520
521    /// Allocate and map a file of size `byte_len`. Returns an handle, or error if the allocation
522    /// fails.
523    fn init_file(byte_len: usize) -> Result<(File, MmapMut), AllocError> {
524        let path = GlobalStealer::get_static().path.read().unwrap().clone();
525        let Some(path) = path else {
526            return Err(AllocError::Io(std::io::Error::from(
527                std::io::ErrorKind::NotFound,
528            )));
529        };
530        let file = tempfile::tempfile_in(path)?;
531        let fd = file.as_fd().as_raw_fd();
532        // SAFETY: Calling ftruncate on the file, which we just created.
533        unsafe {
534            let ret = libc::ftruncate(fd, libc::off_t::try_from(byte_len).expect("Must fit"));
535            if ret != 0 {
536                return Err(std::io::Error::last_os_error().into());
537            }
538        }
539        // SAFETY: We only map `file` once, and never share it with other processes.
540        let mmap = unsafe { memmap2::MmapOptions::new().map_mut(&file)? };
541        assert_eq!(mmap.len(), byte_len);
542        Ok((file, mmap))
543    }
544}
545
546impl Drop for LocalSizeClass {
547    fn drop(&mut self) {
548        // Remove state associated with thread
549        if let Ok(mut lock) = self.size_class_state.stealers.write() {
550            lock.remove(&self.thread_id);
551        }
552
553        // Send memory back to global state
554        while let Some(mem) = self.worker.pop() {
555            self.size_class_state.injector.push(mem);
556        }
557
558        let ordering = Ordering::Relaxed;
559
560        // Update global metrics by moving all worker-local metrics to global state.
561        self.size_class_state
562            .alloc_stats
563            .allocations
564            .fetch_add(self.stats.allocations.load(ordering), ordering);
565        let global_stats = &self.size_class_state.alloc_stats;
566        global_stats
567            .refill
568            .fetch_add(self.stats.refill.load(ordering), ordering);
569        global_stats
570            .slow_path
571            .fetch_add(self.stats.slow_path.load(ordering), ordering);
572        global_stats
573            .deallocations
574            .fetch_add(self.stats.deallocations.load(ordering), ordering);
575        global_stats
576            .clear_slow
577            .fetch_add(self.stats.clear_slow.load(ordering), ordering);
578        global_stats
579            .clear_eager
580            .fetch_add(self.stats.clear_eager.load(ordering), ordering);
581    }
582}
583
584/// Access the per-thread context.
585fn thread_context<R, F: FnOnce(&ThreadLocalStealer) -> R>(f: F) -> R {
586    WORKER.with(|cell| f(&cell.borrow()))
587}
588
589/// Allocate a memory area suitable to hold `capacity` consecutive elements of `T`.
590///
591/// Returns a pointer, a capacity in `T`, and a handle if successful, and an error
592/// otherwise. The capacity can be larger than requested.
593///
594/// The memory must be freed using [`deallocate`], otherwise the memory leaks. The memory can be freed on a different thread.
595///
596/// # Errors
597///
598/// Allocate errors if the capacity cannot be supported by one of the size classes,
599/// the alignment requirements of `T` cannot be fulfilled, if no more memory can be
600/// obtained from the system, or if any syscall fails.
601///
602/// The function also returns an error if lgalloc is disabled.
603///
604/// In the case of an error, no memory is allocated, and we maintain the internal
605/// invariants of the allocator.
606///
607/// # Panics
608///
609/// The function can panic on internal errors, specifically when an allocation returned
610/// an unexpected size. In this case, we do not maintain the allocator invariants
611/// and the caller should abort the process.
612///
613/// Panics if the thread local variable has been dropped, see [`std::thread::LocalKey`]
614/// for details.
615pub fn allocate<T>(capacity: usize) -> Result<(NonNull<T>, usize, Handle), AllocError> {
616    if std::mem::size_of::<T>() == 0 {
617        return Ok((NonNull::dangling(), usize::MAX, Handle::dangling()));
618    } else if capacity == 0 {
619        return Ok((NonNull::dangling(), 0, Handle::dangling()));
620    }
621
622    // Round up to at least a page.
623    let byte_len = std::cmp::max(page_size::get(), std::mem::size_of::<T>() * capacity);
624    // With above rounding up to page sizes, we only allocate multiples of page size because
625    // we only support powers-of-two sized regions.
626    let size_class = SizeClass::from_byte_size(byte_len)?;
627
628    let handle = thread_context(|s| s.allocate(size_class))?;
629    debug_assert_eq!(handle.len(), size_class.byte_size());
630    let ptr: NonNull<T> = handle.as_non_null().cast();
631    // Memory region should be page-aligned, which we assume to be larger than any alignment
632    // we might encounter. If this is not the case, bail out.
633    if ptr.as_ptr().align_offset(std::mem::align_of::<T>()) != 0 {
634        thread_context(move |s| s.deallocate(handle));
635        return Err(AllocError::UnalignedMemory);
636    }
637    let actual_capacity = handle.len() / std::mem::size_of::<T>();
638    Ok((ptr, actual_capacity, handle))
639}
640
641/// Free the memory referenced by `handle`, which has been obtained from [`allocate`].
642///
643/// This function cannot fail. The caller must not access the memory after freeing it. The caller is responsible
644/// for dropping/forgetting data.
645///
646/// # Panics
647///
648/// Panics if the thread local variable has been dropped, see [`std::thread::LocalKey`]
649/// for details.
650pub fn deallocate(handle: Handle) {
651    if handle.is_dangling() {
652        return;
653    }
654    thread_context(|s| s.deallocate(handle));
655}
656
657/// A background worker that performs periodic tasks.
658struct BackgroundWorker {
659    config: BackgroundWorkerConfig,
660    receiver: Receiver<BackgroundWorkerConfig>,
661    global_stealer: &'static GlobalStealer,
662    worker: Worker<Handle>,
663}
664
665impl BackgroundWorker {
666    fn new(receiver: Receiver<BackgroundWorkerConfig>) -> Self {
667        let config = BackgroundWorkerConfig {
668            interval: Duration::MAX,
669            ..Default::default()
670        };
671        let global_stealer = GlobalStealer::get_static();
672        let worker = Worker::new_fifo();
673        Self {
674            config,
675            receiver,
676            global_stealer,
677            worker,
678        }
679    }
680
681    fn run(&mut self) {
682        let mut next_cleanup: Option<Instant> = None;
683        loop {
684            let timeout = next_cleanup.map_or(Duration::MAX, |next_cleanup| {
685                next_cleanup.saturating_duration_since(Instant::now())
686            });
687            match self.receiver.recv_timeout(timeout) {
688                Ok(config) => {
689                    self.config = config;
690                    next_cleanup = None;
691                }
692                Err(RecvTimeoutError::Disconnected) => break,
693                Err(RecvTimeoutError::Timeout) => {
694                    self.maintenance();
695                }
696            }
697            next_cleanup = next_cleanup
698                .unwrap_or_else(Instant::now)
699                .checked_add(self.config.interval);
700        }
701    }
702
703    fn maintenance(&self) {
704        for (index, size_class_state) in self.global_stealer.size_classes.iter().enumerate() {
705            let size_class = SizeClass::from_index(index);
706            let count = self.clear(size_class, size_class_state, &self.worker);
707            size_class_state
708                .alloc_stats
709                .clear_slow
710                .fetch_add(count.try_into().expect("must fit"), Ordering::Relaxed);
711        }
712    }
713
714    fn clear(
715        &self,
716        size_class: SizeClass,
717        state: &SizeClassState,
718        worker: &Worker<Handle>,
719    ) -> usize {
720        // Clear batch size, and at least one element.
721        let byte_size = size_class.byte_size();
722        let mut limit = (self.config.clear_bytes + byte_size - 1) / byte_size;
723        let mut count = 0;
724        let mut steal = Steal::Retry;
725        while limit > 0 && !steal.is_empty() {
726            steal = std::iter::repeat_with(|| state.injector.steal_batch_with_limit(worker, limit))
727                .find(|s| !s.is_retry())
728                .unwrap_or(Steal::Empty);
729            while let Some(mut mem) = worker.pop() {
730                match mem.clear() {
731                    Ok(()) => count += 1,
732                    Err(e) => panic!("Syscall failed: {e:?}"),
733                }
734                state.clean_injector.push(mem);
735                limit -= 1;
736            }
737        }
738        count
739    }
740}
741
742/// Set or update the configuration for lgalloc.
743///
744/// The function accepts a configuration, which is then applied on lgalloc. It allows clients to
745/// change the path where area files reside, and change the configuration of the background task.
746///
747/// Updating the area path only applies to new allocations, existing allocations are not moved to
748/// the new path.
749///
750/// Updating the background thread configuration eventually applies the new configuration on the
751/// running thread, or starts the background worker.
752///
753/// # Panics
754///
755/// Panics if the internal state of lgalloc is corrupted.
756pub fn lgalloc_set_config(config: &LgAlloc) {
757    let stealer = GlobalStealer::get_static();
758
759    if let Some(enabled) = &config.enabled {
760        LGALLOC_ENABLED.store(*enabled, Ordering::Relaxed);
761    }
762
763    if let Some(eager_return) = &config.eager_return {
764        LGALLOC_EAGER_RETURN.store(*eager_return, Ordering::Relaxed);
765    }
766
767    if let Some(path) = &config.path {
768        *stealer.path.write().unwrap() = Some(path.clone());
769    }
770
771    if let Some(file_growth_dampener) = &config.file_growth_dampener {
772        LGALLOC_FILE_GROWTH_DAMPENER.store(*file_growth_dampener, Ordering::Relaxed);
773    }
774
775    if let Some(local_buffer_bytes) = &config.local_buffer_bytes {
776        LOCAL_BUFFER_BYTES.store(*local_buffer_bytes, Ordering::Relaxed);
777    }
778
779    if let Some(config) = config.background_config.clone() {
780        let mut lock = stealer.background_sender.lock().unwrap();
781
782        let config = if let Some((_, sender)) = &*lock {
783            match sender.send(config) {
784                Ok(()) => None,
785                Err(err) => Some(err.0),
786            }
787        } else {
788            Some(config)
789        };
790        if let Some(config) = config {
791            let (sender, receiver) = std::sync::mpsc::channel();
792            let mut worker = BackgroundWorker::new(receiver);
793            let join_handle = std::thread::Builder::new()
794                .name("lgalloc-0".to_string())
795                .spawn(move || worker.run())
796                .expect("thread started successfully");
797            sender.send(config).expect("Receiver exists");
798            *lock = Some((join_handle, sender));
799        }
800    }
801}
802
803/// Configuration for lgalloc's background worker.
804#[derive(Default, Debug, Clone, Eq, PartialEq)]
805pub struct BackgroundWorkerConfig {
806    /// How frequently it should tick
807    pub interval: Duration,
808    /// How many bytes to clear per size class.
809    pub clear_bytes: usize,
810}
811
812/// Lgalloc configuration
813#[derive(Default, Clone, Eq, PartialEq)]
814pub struct LgAlloc {
815    /// Whether the allocator is enabled or not.
816    pub enabled: Option<bool>,
817    /// Path where files reside.
818    pub path: Option<PathBuf>,
819    /// Configuration of the background worker.
820    pub background_config: Option<BackgroundWorkerConfig>,
821    /// Whether to return physical memory on deallocate
822    pub eager_return: Option<bool>,
823    /// Dampener in the file growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
824    pub file_growth_dampener: Option<usize>,
825    /// Size of the per-thread per-size class cache, in bytes.
826    pub local_buffer_bytes: Option<usize>,
827}
828
829impl LgAlloc {
830    /// Construct a new configuration. All values are initialized to their default (None) values.
831    #[must_use]
832    pub fn new() -> Self {
833        Self::default()
834    }
835
836    /// Enable lgalloc globally.
837    pub fn enable(&mut self) -> &mut Self {
838        self.enabled = Some(true);
839        self
840    }
841
842    /// Disable lgalloc globally.
843    pub fn disable(&mut self) -> &mut Self {
844        self.enabled = Some(false);
845        self
846    }
847
848    /// Set the background worker configuration.
849    pub fn with_background_config(&mut self, config: BackgroundWorkerConfig) -> &mut Self {
850        self.background_config = Some(config);
851        self
852    }
853
854    /// Set the area file path.
855    pub fn with_path(&mut self, path: PathBuf) -> &mut Self {
856        self.path = Some(path);
857        self
858    }
859
860    /// Enable eager memory reclamation.
861    pub fn eager_return(&mut self, eager_return: bool) -> &mut Self {
862        self.eager_return = Some(eager_return);
863        self
864    }
865
866    /// Set the file growth dampener.
867    pub fn file_growth_dampener(&mut self, file_growth_dapener: usize) -> &mut Self {
868        self.file_growth_dampener = Some(file_growth_dapener);
869        self
870    }
871
872    /// Set the local buffer size.
873    pub fn local_buffer_bytes(&mut self, local_buffer_bytes: usize) -> &mut Self {
874        self.local_buffer_bytes = Some(local_buffer_bytes);
875        self
876    }
877}
878
879/// Determine global statistics per size class
880///
881/// Note that this function take a read lock on various structures.
882///
883/// # Panics
884///
885/// Panics if the internal state of lgalloc is corrupted.
886pub fn lgalloc_stats() -> LgAllocStats {
887    let mut numa_map = NumaMap::from_file("/proc/self/numa_maps");
888
889    let global = GlobalStealer::get_static();
890
891    let mut size_classes = Vec::with_capacity(global.size_classes.len());
892    let mut file_stats = Vec::new();
893
894    for (index, size_class_state) in global.size_classes.iter().enumerate() {
895        let size_class = SizeClass::from_index(index);
896
897        let areas_lock = size_class_state.areas.read().unwrap();
898
899        let areas = areas_lock.len();
900        if areas == 0 {
901            continue;
902        }
903
904        let size_class = size_class.byte_size();
905        let area_total_bytes = areas_lock.iter().map(|area| area.1.len()).sum();
906        let global_regions = size_class_state.injector.len();
907        let clean_regions = size_class_state.clean_injector.len();
908        let stealers = size_class_state.stealers.read().unwrap();
909        let mut thread_regions = 0;
910        let mut allocations = 0;
911        let mut deallocations = 0;
912        let mut refill = 0;
913        let mut slow_path = 0;
914        let mut clear_eager_total = 0;
915        let mut clear_slow_total = 0;
916        for thread_state in stealers.values() {
917            thread_regions += thread_state.stealer.len();
918            let thread_stats = &*thread_state.alloc_stats;
919            allocations += thread_stats.allocations.load(Ordering::Relaxed);
920            deallocations += thread_stats.deallocations.load(Ordering::Relaxed);
921            refill += thread_stats.refill.load(Ordering::Relaxed);
922            slow_path += thread_stats.slow_path.load(Ordering::Relaxed);
923            clear_eager_total += thread_stats.clear_eager.load(Ordering::Relaxed);
924            clear_slow_total += thread_stats.clear_slow.load(Ordering::Relaxed);
925        }
926
927        let free_regions = thread_regions + global_regions + clean_regions;
928
929        let global_stats = &size_class_state.alloc_stats;
930        allocations += global_stats.allocations.load(Ordering::Relaxed);
931        deallocations += global_stats.deallocations.load(Ordering::Relaxed);
932        refill += global_stats.refill.load(Ordering::Relaxed);
933        slow_path += global_stats.slow_path.load(Ordering::Relaxed);
934        clear_eager_total += global_stats.clear_eager.load(Ordering::Relaxed);
935        clear_slow_total += global_stats.clear_slow.load(Ordering::Relaxed);
936
937        size_classes.push(SizeClassStats {
938            size_class,
939            areas,
940            area_total_bytes,
941            free_regions,
942            global_regions,
943            clean_regions,
944            thread_regions,
945            allocations,
946            deallocations,
947            refill,
948            slow_path,
949            clear_eager_total,
950            clear_slow_total,
951        });
952
953        if let Ok(numa_map) = numa_map.as_mut() {
954            let areas = &areas_lock[..];
955            file_stats.extend(extract_file_stats(
956                size_class,
957                numa_map,
958                areas.iter().map(Deref::deref),
959            ));
960        }
961    }
962
963    LgAllocStats {
964        file_stats: match numa_map {
965            Ok(_) => Ok(file_stats),
966            Err(err) => Err(err),
967        },
968        size_class: size_classes,
969    }
970}
971
972/// Extract for a size class area stats.
973fn extract_file_stats<'a>(
974    size_class: usize,
975    numa_map: &'a mut NumaMap,
976    areas: impl IntoIterator<Item = &'a (File, MmapMut)> + 'a,
977) -> impl Iterator<Item = FileStats> + 'a {
978    // Normalize numa_maps, and sort by address.
979    for entry in &mut numa_map.ranges {
980        entry.normalize();
981    }
982    numa_map.ranges.sort();
983
984    areas.into_iter().map(move |(file, mmap)| {
985        let (mapped, active, dirty) = {
986            let base = mmap.as_ptr().cast::<()>() as usize;
987            let range = match numa_map
988                .ranges
989                .binary_search_by(|range| range.address.cmp(&base))
990            {
991                Ok(pos) => Some(&numa_map.ranges[pos]),
992                // `numa_maps` only updates periodically, so we might be missing some
993                // expected ranges.
994                Err(_pos) => None,
995            };
996
997            let mut mapped = 0;
998            let mut active = 0;
999            let mut dirty = 0;
1000            for property in range.iter().flat_map(|e| e.properties.iter()) {
1001                match property {
1002                    numa_maps::Property::Dirty(d) => dirty = *d,
1003                    numa_maps::Property::Mapped(m) => mapped = *m,
1004                    numa_maps::Property::Active(a) => active = *a,
1005                    _ => {}
1006                }
1007            }
1008            (mapped, active, dirty)
1009        };
1010
1011        let mut stat: MaybeUninit<libc::stat> = MaybeUninit::uninit();
1012        // SAFETY: File descriptor valid, stat object valid.
1013        let ret = unsafe { libc::fstat(file.as_raw_fd(), stat.as_mut_ptr()) };
1014        let stat = if ret == -1 {
1015            None
1016        } else {
1017            // SAFETY: `stat` is initialized in the fstat non-error case.
1018            Some(unsafe { stat.assume_init_ref() })
1019        };
1020
1021        let (blocks, file_size) = stat.map_or((0, 0), |stat| {
1022            (
1023                stat.st_blocks.try_into().unwrap_or(0),
1024                stat.st_size.try_into().unwrap_or(0),
1025            )
1026        });
1027        FileStats {
1028            size_class,
1029            file_size,
1030            // Documented as multiples of 512
1031            allocated_size: blocks * 512,
1032            mapped,
1033            active,
1034            dirty,
1035        }
1036    })
1037}
1038
1039/// Statistics about lgalloc's internal behavior.
1040#[derive(Debug)]
1041pub struct LgAllocStats {
1042    /// Per size-class statistics.
1043    pub size_class: Vec<SizeClassStats>,
1044    /// Per size-class and backing file statistics.
1045    pub file_stats: Result<Vec<FileStats>, std::io::Error>,
1046}
1047
1048/// Statistics per size class.
1049#[derive(Debug)]
1050pub struct SizeClassStats {
1051    /// Size class in bytes
1052    pub size_class: usize,
1053    /// Number of areas backing a size class.
1054    pub areas: usize,
1055    /// Total number of bytes summed across all areas.
1056    pub area_total_bytes: usize,
1057    /// Free regions
1058    pub free_regions: usize,
1059    /// Clean free regions in the global allocator
1060    pub clean_regions: usize,
1061    /// Regions in the global allocator
1062    pub global_regions: usize,
1063    /// Regions retained in thread-local allocators
1064    pub thread_regions: usize,
1065    /// Total allocations
1066    pub allocations: u64,
1067    /// Total slow-path allocations (globally out of memory)
1068    pub slow_path: u64,
1069    /// Total refills
1070    pub refill: u64,
1071    /// Total deallocations
1072    pub deallocations: u64,
1073    /// Total times memory has been returned to the OS (eager reclamation) in regions.
1074    pub clear_eager_total: u64,
1075    /// Total times memory has been returned to the OS (slow reclamation) in regions.
1076    pub clear_slow_total: u64,
1077}
1078
1079/// Statistics per size class and backing file.
1080#[derive(Debug)]
1081pub struct FileStats {
1082    /// The size class in bytes.
1083    pub size_class: usize,
1084    /// The size of the file in bytes.
1085    pub file_size: usize,
1086    /// Size of the file on disk in bytes.
1087    pub allocated_size: usize,
1088    /// Number of mapped bytes, if different from `dirty`. Consult `man 7 numa` for details.
1089    pub mapped: usize,
1090    /// Number of active bytes. Consult `man 7 numa` for details.
1091    pub active: usize,
1092    /// Number of dirty bytes. Consult `man 7 numa` for details.
1093    pub dirty: usize,
1094}
1095
1096#[cfg(test)]
1097mod test {
1098    use std::mem::{ManuallyDrop, MaybeUninit};
1099    use std::ptr::NonNull;
1100    use std::sync::atomic::{AtomicBool, Ordering};
1101    use std::sync::Arc;
1102    use std::time::Duration;
1103
1104    use serial_test::serial;
1105
1106    use super::*;
1107
1108    fn initialize() {
1109        lgalloc_set_config(
1110            LgAlloc::new()
1111                .enable()
1112                .with_background_config(BackgroundWorkerConfig {
1113                    interval: Duration::from_secs(1),
1114                    clear_bytes: 4 << 20,
1115                })
1116                .with_path(std::env::temp_dir())
1117                .file_growth_dampener(1),
1118        );
1119    }
1120
1121    struct Wrapper<T> {
1122        handle: MaybeUninit<Handle>,
1123        ptr: NonNull<MaybeUninit<T>>,
1124        cap: usize,
1125    }
1126
1127    unsafe impl<T: Send> Send for Wrapper<T> {}
1128    unsafe impl<T: Sync> Sync for Wrapper<T> {}
1129
1130    impl<T> Wrapper<T> {
1131        fn allocate(capacity: usize) -> Result<Self, AllocError> {
1132            let (ptr, cap, handle) = allocate(capacity)?;
1133            assert!(cap > 0);
1134            let handle = MaybeUninit::new(handle);
1135            Ok(Self { ptr, cap, handle })
1136        }
1137
1138        fn as_slice(&mut self) -> &mut [MaybeUninit<T>] {
1139            unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.cap) }
1140        }
1141    }
1142
1143    impl<T> Drop for Wrapper<T> {
1144        fn drop(&mut self) {
1145            unsafe { deallocate(self.handle.assume_init_read()) };
1146        }
1147    }
1148
1149    #[test]
1150    #[serial]
1151    fn test_readme() -> Result<(), AllocError> {
1152        initialize();
1153
1154        // Allocate memory
1155        let (ptr, cap, handle) = allocate::<u8>(2 << 20)?;
1156        // SAFETY: `allocate` returns a valid memory region and errors otherwise.
1157        let mut vec = ManuallyDrop::new(unsafe { Vec::from_raw_parts(ptr.as_ptr(), 0, cap) });
1158
1159        // Write into region, make sure not to reallocate vector.
1160        vec.extend_from_slice(&[1, 2, 3, 4]);
1161
1162        // We can read from the vector.
1163        assert_eq!(&*vec, &[1, 2, 3, 4]);
1164
1165        // Deallocate after use
1166        deallocate(handle);
1167        Ok(())
1168    }
1169
1170    #[test]
1171    #[serial]
1172    fn test_1() -> Result<(), AllocError> {
1173        initialize();
1174        <Wrapper<u8>>::allocate(4 << 20)?.as_slice()[0] = MaybeUninit::new(1);
1175        Ok(())
1176    }
1177
1178    #[test]
1179    #[serial]
1180    fn test_3() -> Result<(), AllocError> {
1181        initialize();
1182        let until = Arc::new(AtomicBool::new(true));
1183
1184        let inner = || {
1185            let until = Arc::clone(&until);
1186            move || {
1187                let mut i = 0;
1188                let until = &*until;
1189                while until.load(Ordering::Relaxed) {
1190                    i += 1;
1191                    let mut r = <Wrapper<u8>>::allocate(4 << 20).unwrap();
1192                    r.as_slice()[0] = MaybeUninit::new(1);
1193                }
1194                println!("repetitions: {i}");
1195            }
1196        };
1197        let handles = [
1198            std::thread::spawn(inner()),
1199            std::thread::spawn(inner()),
1200            std::thread::spawn(inner()),
1201            std::thread::spawn(inner()),
1202        ];
1203        std::thread::sleep(Duration::from_secs(4));
1204        until.store(false, Ordering::Relaxed);
1205        for handle in handles {
1206            handle.join().unwrap();
1207        }
1208        // std::thread::sleep(Duration::from_secs(600));
1209        Ok(())
1210    }
1211
1212    #[test]
1213    #[serial]
1214    fn test_4() -> Result<(), AllocError> {
1215        initialize();
1216        let until = Arc::new(AtomicBool::new(true));
1217
1218        let inner = || {
1219            let until = Arc::clone(&until);
1220            move || {
1221                let mut i = 0;
1222                let until = &*until;
1223                let batch = 64;
1224                let mut buffer = Vec::with_capacity(batch);
1225                while until.load(Ordering::Relaxed) {
1226                    i += 64;
1227                    buffer.extend((0..batch).map(|_| {
1228                        let mut r = <Wrapper<u8>>::allocate(2 << 20).unwrap();
1229                        r.as_slice()[0] = MaybeUninit::new(1);
1230                        r
1231                    }));
1232                    buffer.clear();
1233                }
1234                println!("repetitions vec: {i}");
1235            }
1236        };
1237        let handles = [
1238            std::thread::spawn(inner()),
1239            std::thread::spawn(inner()),
1240            std::thread::spawn(inner()),
1241            std::thread::spawn(inner()),
1242        ];
1243        std::thread::sleep(Duration::from_secs(4));
1244        until.store(false, Ordering::Relaxed);
1245        for handle in handles {
1246            handle.join().unwrap();
1247        }
1248        std::thread::sleep(Duration::from_secs(1));
1249        let stats = lgalloc_stats();
1250        for size_class in &stats.size_class {
1251            println!("size_class {:?}", size_class);
1252        }
1253        match stats.file_stats {
1254            Ok(file_stats) => {
1255                for file_stats in file_stats {
1256                    println!("file_stats {:?}", file_stats);
1257                }
1258            }
1259            Err(e) => eprintln!("error: {e}"),
1260        }
1261        Ok(())
1262    }
1263
1264    #[test]
1265    #[serial]
1266    fn leak() -> Result<(), AllocError> {
1267        lgalloc_set_config(&LgAlloc {
1268            enabled: Some(true),
1269            path: Some(std::env::temp_dir()),
1270            ..Default::default()
1271        });
1272        let r = <Wrapper<u8>>::allocate(1000)?;
1273
1274        let thread = std::thread::spawn(move || drop(r));
1275
1276        thread.join().unwrap();
1277        Ok(())
1278    }
1279
1280    #[test]
1281    #[serial]
1282    fn test_zst() -> Result<(), AllocError> {
1283        initialize();
1284        <Wrapper<()>>::allocate(10)?;
1285        Ok(())
1286    }
1287
1288    #[test]
1289    #[serial]
1290    fn test_zero_capacity_zst() -> Result<(), AllocError> {
1291        initialize();
1292        <Wrapper<()>>::allocate(0)?;
1293        Ok(())
1294    }
1295
1296    #[test]
1297    #[serial]
1298    fn test_zero_capacity_nonzst() -> Result<(), AllocError> {
1299        initialize();
1300        <Wrapper<()>>::allocate(0)?;
1301        Ok(())
1302    }
1303
1304    #[test]
1305    #[serial]
1306    fn test_stats() -> Result<(), AllocError> {
1307        initialize();
1308        let (_ptr, _cap, handle) = allocate::<usize>(1024)?;
1309        deallocate(handle);
1310
1311        let stats = lgalloc_stats();
1312
1313        assert!(!stats.size_class.is_empty());
1314
1315        Ok(())
1316    }
1317
1318    #[test]
1319    #[serial]
1320    fn test_disable() {
1321        lgalloc_set_config(&*LgAlloc::new().disable());
1322        assert!(matches!(allocate::<u8>(1024), Err(AllocError::Disabled)));
1323    }
1324}