1#![deny(missing_docs)]
21
22use std::cell::RefCell;
23use std::collections::HashMap;
24use std::fs::File;
25use std::marker::PhantomData;
26use std::mem::{take, ManuallyDrop, MaybeUninit};
27use std::ops::{Deref, Range};
28use std::os::fd::{AsFd, AsRawFd};
29use std::path::PathBuf;
30use std::ptr::NonNull;
31use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
32use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
33use std::sync::{Arc, Mutex, OnceLock, RwLock};
34use std::thread::{JoinHandle, ThreadId};
35use std::time::{Duration, Instant};
36
37use crossbeam_deque::{Injector, Steal, Stealer, Worker};
38use memmap2::MmapMut;
39use numa_maps::NumaMap;
40use thiserror::Error;
41
42mod readme {
43 #![doc = include_str!("../README.md")]
44}
45
46pub struct Handle {
51 ptr: NonNull<u8>,
53 len: usize,
55}
56
57unsafe impl Send for Handle {}
58unsafe impl Sync for Handle {}
59
60#[allow(clippy::len_without_is_empty)]
61impl Handle {
62 fn new(ptr: NonNull<u8>, len: usize) -> Self {
64 Self { ptr, len }
65 }
66
67 fn dangling() -> Self {
69 Self {
70 ptr: NonNull::dangling(),
71 len: 0,
72 }
73 }
74
75 fn is_dangling(&self) -> bool {
76 self.ptr == NonNull::dangling()
77 }
78
79 fn len(&self) -> usize {
81 self.len
82 }
83
84 fn as_non_null(&self) -> NonNull<u8> {
86 self.ptr
87 }
88
89 fn clear(&mut self) -> std::io::Result<()> {
91 unsafe { self.madvise(MADV_DONTNEED_STRATEGY) }
93 }
94
95 fn fast_clear(&mut self) -> std::io::Result<()> {
97 unsafe { self.madvise(libc::MADV_DONTNEED) }
99 }
100
101 unsafe fn madvise(&self, advice: libc::c_int) -> std::io::Result<()> {
103 let ptr = self.as_non_null().as_ptr().cast();
110 let ret = unsafe { libc::madvise(ptr, self.len, advice) };
111 if ret != 0 {
112 let err = std::io::Error::last_os_error();
113 eprintln!("madvise failed: {ret} {err:?}",);
114 return Err(err);
115 }
116 Ok(())
117 }
118}
119
120const INITIAL_SIZE: usize = 32 << 20;
122
123pub const VALID_SIZE_CLASS: Range<usize> = 10..37;
125
126#[cfg(target_os = "linux")]
131const MADV_DONTNEED_STRATEGY: libc::c_int = libc::MADV_REMOVE;
132
133#[cfg(not(target_os = "linux"))]
134const MADV_DONTNEED_STRATEGY: libc::c_int = libc::MADV_DONTNEED;
135
136type PhantomUnsyncUnsend<T> = PhantomData<*mut T>;
137
138#[derive(Error, Debug)]
140pub enum AllocError {
141 #[error("I/O error")]
143 Io(#[from] std::io::Error),
144 #[error("Out of memory")]
146 OutOfMemory,
147 #[error("Invalid size class")]
149 InvalidSizeClass(usize),
150 #[error("Disabled by configuration")]
152 Disabled,
153 #[error("Memory unsuitable for requested alignment")]
155 UnalignedMemory,
156}
157
158impl AllocError {
159 #[must_use]
161 pub fn is_disabled(&self) -> bool {
162 matches!(self, AllocError::Disabled)
163 }
164}
165
166#[derive(Clone, Copy)]
168struct SizeClass(usize);
169
170impl SizeClass {
171 const fn new_unchecked(value: usize) -> Self {
172 Self(value)
173 }
174
175 const fn index(self) -> usize {
176 self.0 - VALID_SIZE_CLASS.start
177 }
178
179 const fn byte_size(self) -> usize {
181 1 << self.0
182 }
183
184 const fn from_index(index: usize) -> Self {
185 Self(index + VALID_SIZE_CLASS.start)
186 }
187
188 fn from_byte_size(byte_size: usize) -> Result<Self, AllocError> {
190 let class = byte_size.next_power_of_two().trailing_zeros() as usize;
191 class.try_into()
192 }
193
194 const fn from_byte_size_unchecked(byte_size: usize) -> Self {
195 Self::new_unchecked(byte_size.next_power_of_two().trailing_zeros() as usize)
196 }
197}
198
199impl TryFrom<usize> for SizeClass {
200 type Error = AllocError;
201
202 fn try_from(value: usize) -> Result<Self, Self::Error> {
203 if VALID_SIZE_CLASS.contains(&value) {
204 Ok(SizeClass(value))
205 } else {
206 Err(AllocError::InvalidSizeClass(value))
207 }
208 }
209}
210
211#[derive(Default, Debug)]
212struct AllocStats {
213 allocations: AtomicU64,
214 slow_path: AtomicU64,
215 refill: AtomicU64,
216 deallocations: AtomicU64,
217 clear_eager: AtomicU64,
218 clear_slow: AtomicU64,
219}
220
221static INJECTOR: OnceLock<GlobalStealer> = OnceLock::new();
223
224static LGALLOC_ENABLED: AtomicBool = AtomicBool::new(false);
226
227static LGALLOC_EAGER_RETURN: AtomicBool = AtomicBool::new(false);
229
230static LGALLOC_FILE_GROWTH_DAMPENER: AtomicUsize = AtomicUsize::new(0);
235
236static LOCAL_BUFFER_BYTES: AtomicUsize = AtomicUsize::new(32 << 20);
238
239struct GlobalStealer {
241 size_classes: Vec<SizeClassState>,
244 path: RwLock<Option<PathBuf>>,
246 background_sender: Mutex<Option<(JoinHandle<()>, Sender<BackgroundWorkerConfig>)>>,
248}
249
250#[derive(Default)]
252struct SizeClassState {
253 areas: RwLock<Vec<ManuallyDrop<(File, MmapMut)>>>,
257 injector: Injector<Handle>,
259 clean_injector: Injector<Handle>,
261 lock: Mutex<()>,
263 stealers: RwLock<HashMap<ThreadId, PerThreadState<Handle>>>,
265 alloc_stats: AllocStats,
267}
268
269impl GlobalStealer {
270 fn get_static() -> &'static Self {
272 INJECTOR.get_or_init(Self::new)
273 }
274
275 fn get_size_class(&self, size_class: SizeClass) -> &SizeClassState {
277 &self.size_classes[size_class.index()]
278 }
279
280 fn new() -> Self {
281 let mut size_classes = Vec::with_capacity(VALID_SIZE_CLASS.len());
282
283 for _ in VALID_SIZE_CLASS {
284 size_classes.push(SizeClassState::default());
285 }
286
287 Self {
288 size_classes,
289 path: RwLock::default(),
290 background_sender: Mutex::default(),
291 }
292 }
293}
294
295impl Drop for GlobalStealer {
296 fn drop(&mut self) {
297 take(&mut self.size_classes);
298 }
299}
300
301struct PerThreadState<T> {
302 stealer: Stealer<T>,
303 alloc_stats: Arc<AllocStats>,
304}
305
306struct ThreadLocalStealer {
308 size_classes: Vec<LocalSizeClass>,
310 _phantom: PhantomUnsyncUnsend<Self>,
311}
312
313impl ThreadLocalStealer {
314 fn new() -> Self {
315 let thread_id = std::thread::current().id();
316 let size_classes = VALID_SIZE_CLASS
317 .map(|size_class| LocalSizeClass::new(SizeClass::new_unchecked(size_class), thread_id))
318 .collect();
319 Self {
320 size_classes,
321 _phantom: PhantomData,
322 }
323 }
324
325 fn allocate(&self, size_class: SizeClass) -> Result<Handle, AllocError> {
330 if !LGALLOC_ENABLED.load(Ordering::Relaxed) {
331 return Err(AllocError::Disabled);
332 }
333 self.size_classes[size_class.index()].get_with_refill()
334 }
335
336 fn deallocate(&self, mem: Handle) {
338 let size_class = SizeClass::from_byte_size_unchecked(mem.len());
339
340 self.size_classes[size_class.index()].push(mem);
341 }
342}
343
344thread_local! {
345 static WORKER: RefCell<ThreadLocalStealer> = RefCell::new(ThreadLocalStealer::new());
346}
347
348struct LocalSizeClass {
355 worker: Worker<Handle>,
357 size_class: SizeClass,
359 size_class_state: &'static SizeClassState,
361 thread_id: ThreadId,
363 stats: Arc<AllocStats>,
365 _phantom: PhantomUnsyncUnsend<Self>,
367}
368
369impl LocalSizeClass {
370 fn new(size_class: SizeClass, thread_id: ThreadId) -> Self {
372 let worker = Worker::new_lifo();
373 let stealer = GlobalStealer::get_static();
374 let size_class_state = stealer.get_size_class(size_class);
375
376 let stats = Arc::new(AllocStats::default());
377
378 let mut lock = size_class_state.stealers.write().unwrap();
379 lock.insert(
380 thread_id,
381 PerThreadState {
382 stealer: worker.stealer(),
383 alloc_stats: Arc::clone(&stats),
384 },
385 );
386
387 Self {
388 worker,
389 size_class,
390 size_class_state,
391 thread_id,
392 stats,
393 _phantom: PhantomData,
394 }
395 }
396
397 #[inline]
402 fn get(&self) -> Result<Handle, AllocError> {
403 self.worker
404 .pop()
405 .or_else(|| {
406 std::iter::repeat_with(|| {
407 let limit = 1.max(
412 LOCAL_BUFFER_BYTES.load(Ordering::Relaxed)
413 / self.size_class.byte_size()
414 / 2,
415 );
416
417 self.size_class_state
418 .injector
419 .steal_batch_with_limit_and_pop(&self.worker, limit)
420 .or_else(|| {
421 self.size_class_state
422 .clean_injector
423 .steal_batch_with_limit_and_pop(&self.worker, limit)
424 })
425 .or_else(|| {
426 self.size_class_state
427 .stealers
428 .read()
429 .unwrap()
430 .values()
431 .map(|state| state.stealer.steal())
432 .collect()
433 })
434 })
435 .find(|s| !s.is_retry())
436 .and_then(Steal::success)
437 })
438 .ok_or(AllocError::OutOfMemory)
439 }
440
441 fn get_with_refill(&self) -> Result<Handle, AllocError> {
443 self.stats.allocations.fetch_add(1, Ordering::Relaxed);
444 match self.get() {
446 Err(AllocError::OutOfMemory) => {
447 self.stats.slow_path.fetch_add(1, Ordering::Relaxed);
448 let _lock = self.size_class_state.lock.lock().unwrap();
450 if let Ok(mem) = self.get() {
452 return Ok(mem);
453 }
454 self.try_refill_and_get()
455 }
456 r => r,
457 }
458 }
459
460 fn push(&self, mut mem: Handle) {
462 debug_assert_eq!(mem.len(), self.size_class.byte_size());
463 self.stats.deallocations.fetch_add(1, Ordering::Relaxed);
464 if self.worker.len()
465 >= LOCAL_BUFFER_BYTES.load(Ordering::Relaxed) / self.size_class.byte_size()
466 {
467 if LGALLOC_EAGER_RETURN.load(Ordering::Relaxed) {
468 self.stats.clear_eager.fetch_add(1, Ordering::Relaxed);
469 mem.fast_clear().expect("clearing successful");
470 }
471 self.size_class_state.injector.push(mem);
472 } else {
473 self.worker.push(mem);
474 }
475 }
476
477 fn try_refill_and_get(&self) -> Result<Handle, AllocError> {
481 self.stats.refill.fetch_add(1, Ordering::Relaxed);
482 let mut stash = self.size_class_state.areas.write().unwrap();
483
484 let initial_capacity = std::cmp::max(1, INITIAL_SIZE / self.size_class.byte_size());
485
486 let last_capacity =
487 stash.iter().last().map_or(0, |mmap| mmap.1.len()) / self.size_class.byte_size();
488 let growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.load(Ordering::Relaxed);
489 let next_capacity = last_capacity
492 + std::cmp::max(
493 initial_capacity,
494 last_capacity / (growth_dampener.saturating_add(1)),
495 );
496
497 let next_byte_len = next_capacity * self.size_class.byte_size();
498 let (file, mut mmap) = Self::init_file(next_byte_len)?;
499
500 let mut chunks = mmap
502 .as_mut()
503 .chunks_exact_mut(self.size_class.byte_size())
504 .map(|chunk| unsafe { NonNull::new_unchecked(chunk.as_mut_ptr()) });
505
506 let ptr = chunks.next().expect("At least once chunk allocated.");
508 let mem = Handle::new(ptr, self.size_class.byte_size());
509
510 for ptr in chunks {
512 self.size_class_state
513 .clean_injector
514 .push(Handle::new(ptr, self.size_class.byte_size()));
515 }
516
517 stash.push(ManuallyDrop::new((file, mmap)));
518 Ok(mem)
519 }
520
521 fn init_file(byte_len: usize) -> Result<(File, MmapMut), AllocError> {
524 let path = GlobalStealer::get_static().path.read().unwrap().clone();
525 let Some(path) = path else {
526 return Err(AllocError::Io(std::io::Error::from(
527 std::io::ErrorKind::NotFound,
528 )));
529 };
530 let file = tempfile::tempfile_in(path)?;
531 let fd = file.as_fd().as_raw_fd();
532 unsafe {
534 let ret = libc::ftruncate(fd, libc::off_t::try_from(byte_len).expect("Must fit"));
535 if ret != 0 {
536 return Err(std::io::Error::last_os_error().into());
537 }
538 }
539 let mmap = unsafe { memmap2::MmapOptions::new().map_mut(&file)? };
541 assert_eq!(mmap.len(), byte_len);
542 Ok((file, mmap))
543 }
544}
545
546impl Drop for LocalSizeClass {
547 fn drop(&mut self) {
548 if let Ok(mut lock) = self.size_class_state.stealers.write() {
550 lock.remove(&self.thread_id);
551 }
552
553 while let Some(mem) = self.worker.pop() {
555 self.size_class_state.injector.push(mem);
556 }
557
558 let ordering = Ordering::Relaxed;
559
560 self.size_class_state
562 .alloc_stats
563 .allocations
564 .fetch_add(self.stats.allocations.load(ordering), ordering);
565 let global_stats = &self.size_class_state.alloc_stats;
566 global_stats
567 .refill
568 .fetch_add(self.stats.refill.load(ordering), ordering);
569 global_stats
570 .slow_path
571 .fetch_add(self.stats.slow_path.load(ordering), ordering);
572 global_stats
573 .deallocations
574 .fetch_add(self.stats.deallocations.load(ordering), ordering);
575 global_stats
576 .clear_slow
577 .fetch_add(self.stats.clear_slow.load(ordering), ordering);
578 global_stats
579 .clear_eager
580 .fetch_add(self.stats.clear_eager.load(ordering), ordering);
581 }
582}
583
584fn thread_context<R, F: FnOnce(&ThreadLocalStealer) -> R>(f: F) -> R {
586 WORKER.with(|cell| f(&cell.borrow()))
587}
588
589pub fn allocate<T>(capacity: usize) -> Result<(NonNull<T>, usize, Handle), AllocError> {
616 if std::mem::size_of::<T>() == 0 {
617 return Ok((NonNull::dangling(), usize::MAX, Handle::dangling()));
618 } else if capacity == 0 {
619 return Ok((NonNull::dangling(), 0, Handle::dangling()));
620 }
621
622 let byte_len = std::cmp::max(page_size::get(), std::mem::size_of::<T>() * capacity);
624 let size_class = SizeClass::from_byte_size(byte_len)?;
627
628 let handle = thread_context(|s| s.allocate(size_class))?;
629 debug_assert_eq!(handle.len(), size_class.byte_size());
630 let ptr: NonNull<T> = handle.as_non_null().cast();
631 if ptr.as_ptr().align_offset(std::mem::align_of::<T>()) != 0 {
634 thread_context(move |s| s.deallocate(handle));
635 return Err(AllocError::UnalignedMemory);
636 }
637 let actual_capacity = handle.len() / std::mem::size_of::<T>();
638 Ok((ptr, actual_capacity, handle))
639}
640
641pub fn deallocate(handle: Handle) {
651 if handle.is_dangling() {
652 return;
653 }
654 thread_context(|s| s.deallocate(handle));
655}
656
657struct BackgroundWorker {
659 config: BackgroundWorkerConfig,
660 receiver: Receiver<BackgroundWorkerConfig>,
661 global_stealer: &'static GlobalStealer,
662 worker: Worker<Handle>,
663}
664
665impl BackgroundWorker {
666 fn new(receiver: Receiver<BackgroundWorkerConfig>) -> Self {
667 let config = BackgroundWorkerConfig {
668 interval: Duration::MAX,
669 ..Default::default()
670 };
671 let global_stealer = GlobalStealer::get_static();
672 let worker = Worker::new_fifo();
673 Self {
674 config,
675 receiver,
676 global_stealer,
677 worker,
678 }
679 }
680
681 fn run(&mut self) {
682 let mut next_cleanup: Option<Instant> = None;
683 loop {
684 let timeout = next_cleanup.map_or(Duration::MAX, |next_cleanup| {
685 next_cleanup.saturating_duration_since(Instant::now())
686 });
687 match self.receiver.recv_timeout(timeout) {
688 Ok(config) => {
689 self.config = config;
690 next_cleanup = None;
691 }
692 Err(RecvTimeoutError::Disconnected) => break,
693 Err(RecvTimeoutError::Timeout) => {
694 self.maintenance();
695 }
696 }
697 next_cleanup = next_cleanup
698 .unwrap_or_else(Instant::now)
699 .checked_add(self.config.interval);
700 }
701 }
702
703 fn maintenance(&self) {
704 for (index, size_class_state) in self.global_stealer.size_classes.iter().enumerate() {
705 let size_class = SizeClass::from_index(index);
706 let count = self.clear(size_class, size_class_state, &self.worker);
707 size_class_state
708 .alloc_stats
709 .clear_slow
710 .fetch_add(count.try_into().expect("must fit"), Ordering::Relaxed);
711 }
712 }
713
714 fn clear(
715 &self,
716 size_class: SizeClass,
717 state: &SizeClassState,
718 worker: &Worker<Handle>,
719 ) -> usize {
720 let byte_size = size_class.byte_size();
722 let mut limit = (self.config.clear_bytes + byte_size - 1) / byte_size;
723 let mut count = 0;
724 let mut steal = Steal::Retry;
725 while limit > 0 && !steal.is_empty() {
726 steal = std::iter::repeat_with(|| state.injector.steal_batch_with_limit(worker, limit))
727 .find(|s| !s.is_retry())
728 .unwrap_or(Steal::Empty);
729 while let Some(mut mem) = worker.pop() {
730 match mem.clear() {
731 Ok(()) => count += 1,
732 Err(e) => panic!("Syscall failed: {e:?}"),
733 }
734 state.clean_injector.push(mem);
735 limit -= 1;
736 }
737 }
738 count
739 }
740}
741
742pub fn lgalloc_set_config(config: &LgAlloc) {
757 let stealer = GlobalStealer::get_static();
758
759 if let Some(enabled) = &config.enabled {
760 LGALLOC_ENABLED.store(*enabled, Ordering::Relaxed);
761 }
762
763 if let Some(eager_return) = &config.eager_return {
764 LGALLOC_EAGER_RETURN.store(*eager_return, Ordering::Relaxed);
765 }
766
767 if let Some(path) = &config.path {
768 *stealer.path.write().unwrap() = Some(path.clone());
769 }
770
771 if let Some(file_growth_dampener) = &config.file_growth_dampener {
772 LGALLOC_FILE_GROWTH_DAMPENER.store(*file_growth_dampener, Ordering::Relaxed);
773 }
774
775 if let Some(local_buffer_bytes) = &config.local_buffer_bytes {
776 LOCAL_BUFFER_BYTES.store(*local_buffer_bytes, Ordering::Relaxed);
777 }
778
779 if let Some(config) = config.background_config.clone() {
780 let mut lock = stealer.background_sender.lock().unwrap();
781
782 let config = if let Some((_, sender)) = &*lock {
783 match sender.send(config) {
784 Ok(()) => None,
785 Err(err) => Some(err.0),
786 }
787 } else {
788 Some(config)
789 };
790 if let Some(config) = config {
791 let (sender, receiver) = std::sync::mpsc::channel();
792 let mut worker = BackgroundWorker::new(receiver);
793 let join_handle = std::thread::Builder::new()
794 .name("lgalloc-0".to_string())
795 .spawn(move || worker.run())
796 .expect("thread started successfully");
797 sender.send(config).expect("Receiver exists");
798 *lock = Some((join_handle, sender));
799 }
800 }
801}
802
803#[derive(Default, Debug, Clone, Eq, PartialEq)]
805pub struct BackgroundWorkerConfig {
806 pub interval: Duration,
808 pub clear_bytes: usize,
810}
811
812#[derive(Default, Clone, Eq, PartialEq)]
814pub struct LgAlloc {
815 pub enabled: Option<bool>,
817 pub path: Option<PathBuf>,
819 pub background_config: Option<BackgroundWorkerConfig>,
821 pub eager_return: Option<bool>,
823 pub file_growth_dampener: Option<usize>,
825 pub local_buffer_bytes: Option<usize>,
827}
828
829impl LgAlloc {
830 #[must_use]
832 pub fn new() -> Self {
833 Self::default()
834 }
835
836 pub fn enable(&mut self) -> &mut Self {
838 self.enabled = Some(true);
839 self
840 }
841
842 pub fn disable(&mut self) -> &mut Self {
844 self.enabled = Some(false);
845 self
846 }
847
848 pub fn with_background_config(&mut self, config: BackgroundWorkerConfig) -> &mut Self {
850 self.background_config = Some(config);
851 self
852 }
853
854 pub fn with_path(&mut self, path: PathBuf) -> &mut Self {
856 self.path = Some(path);
857 self
858 }
859
860 pub fn eager_return(&mut self, eager_return: bool) -> &mut Self {
862 self.eager_return = Some(eager_return);
863 self
864 }
865
866 pub fn file_growth_dampener(&mut self, file_growth_dapener: usize) -> &mut Self {
868 self.file_growth_dampener = Some(file_growth_dapener);
869 self
870 }
871
872 pub fn local_buffer_bytes(&mut self, local_buffer_bytes: usize) -> &mut Self {
874 self.local_buffer_bytes = Some(local_buffer_bytes);
875 self
876 }
877}
878
879pub fn lgalloc_stats() -> LgAllocStats {
887 let mut numa_map = NumaMap::from_file("/proc/self/numa_maps");
888
889 let global = GlobalStealer::get_static();
890
891 let mut size_classes = Vec::with_capacity(global.size_classes.len());
892 let mut file_stats = Vec::new();
893
894 for (index, size_class_state) in global.size_classes.iter().enumerate() {
895 let size_class = SizeClass::from_index(index);
896
897 let areas_lock = size_class_state.areas.read().unwrap();
898
899 let areas = areas_lock.len();
900 if areas == 0 {
901 continue;
902 }
903
904 let size_class = size_class.byte_size();
905 let area_total_bytes = areas_lock.iter().map(|area| area.1.len()).sum();
906 let global_regions = size_class_state.injector.len();
907 let clean_regions = size_class_state.clean_injector.len();
908 let stealers = size_class_state.stealers.read().unwrap();
909 let mut thread_regions = 0;
910 let mut allocations = 0;
911 let mut deallocations = 0;
912 let mut refill = 0;
913 let mut slow_path = 0;
914 let mut clear_eager_total = 0;
915 let mut clear_slow_total = 0;
916 for thread_state in stealers.values() {
917 thread_regions += thread_state.stealer.len();
918 let thread_stats = &*thread_state.alloc_stats;
919 allocations += thread_stats.allocations.load(Ordering::Relaxed);
920 deallocations += thread_stats.deallocations.load(Ordering::Relaxed);
921 refill += thread_stats.refill.load(Ordering::Relaxed);
922 slow_path += thread_stats.slow_path.load(Ordering::Relaxed);
923 clear_eager_total += thread_stats.clear_eager.load(Ordering::Relaxed);
924 clear_slow_total += thread_stats.clear_slow.load(Ordering::Relaxed);
925 }
926
927 let free_regions = thread_regions + global_regions + clean_regions;
928
929 let global_stats = &size_class_state.alloc_stats;
930 allocations += global_stats.allocations.load(Ordering::Relaxed);
931 deallocations += global_stats.deallocations.load(Ordering::Relaxed);
932 refill += global_stats.refill.load(Ordering::Relaxed);
933 slow_path += global_stats.slow_path.load(Ordering::Relaxed);
934 clear_eager_total += global_stats.clear_eager.load(Ordering::Relaxed);
935 clear_slow_total += global_stats.clear_slow.load(Ordering::Relaxed);
936
937 size_classes.push(SizeClassStats {
938 size_class,
939 areas,
940 area_total_bytes,
941 free_regions,
942 global_regions,
943 clean_regions,
944 thread_regions,
945 allocations,
946 deallocations,
947 refill,
948 slow_path,
949 clear_eager_total,
950 clear_slow_total,
951 });
952
953 if let Ok(numa_map) = numa_map.as_mut() {
954 let areas = &areas_lock[..];
955 file_stats.extend(extract_file_stats(
956 size_class,
957 numa_map,
958 areas.iter().map(Deref::deref),
959 ));
960 }
961 }
962
963 LgAllocStats {
964 file_stats: match numa_map {
965 Ok(_) => Ok(file_stats),
966 Err(err) => Err(err),
967 },
968 size_class: size_classes,
969 }
970}
971
972fn extract_file_stats<'a>(
974 size_class: usize,
975 numa_map: &'a mut NumaMap,
976 areas: impl IntoIterator<Item = &'a (File, MmapMut)> + 'a,
977) -> impl Iterator<Item = FileStats> + 'a {
978 for entry in &mut numa_map.ranges {
980 entry.normalize();
981 }
982 numa_map.ranges.sort();
983
984 areas.into_iter().map(move |(file, mmap)| {
985 let (mapped, active, dirty) = {
986 let base = mmap.as_ptr().cast::<()>() as usize;
987 let range = match numa_map
988 .ranges
989 .binary_search_by(|range| range.address.cmp(&base))
990 {
991 Ok(pos) => Some(&numa_map.ranges[pos]),
992 Err(_pos) => None,
995 };
996
997 let mut mapped = 0;
998 let mut active = 0;
999 let mut dirty = 0;
1000 for property in range.iter().flat_map(|e| e.properties.iter()) {
1001 match property {
1002 numa_maps::Property::Dirty(d) => dirty = *d,
1003 numa_maps::Property::Mapped(m) => mapped = *m,
1004 numa_maps::Property::Active(a) => active = *a,
1005 _ => {}
1006 }
1007 }
1008 (mapped, active, dirty)
1009 };
1010
1011 let mut stat: MaybeUninit<libc::stat> = MaybeUninit::uninit();
1012 let ret = unsafe { libc::fstat(file.as_raw_fd(), stat.as_mut_ptr()) };
1014 let stat = if ret == -1 {
1015 None
1016 } else {
1017 Some(unsafe { stat.assume_init_ref() })
1019 };
1020
1021 let (blocks, file_size) = stat.map_or((0, 0), |stat| {
1022 (
1023 stat.st_blocks.try_into().unwrap_or(0),
1024 stat.st_size.try_into().unwrap_or(0),
1025 )
1026 });
1027 FileStats {
1028 size_class,
1029 file_size,
1030 allocated_size: blocks * 512,
1032 mapped,
1033 active,
1034 dirty,
1035 }
1036 })
1037}
1038
1039#[derive(Debug)]
1041pub struct LgAllocStats {
1042 pub size_class: Vec<SizeClassStats>,
1044 pub file_stats: Result<Vec<FileStats>, std::io::Error>,
1046}
1047
1048#[derive(Debug)]
1050pub struct SizeClassStats {
1051 pub size_class: usize,
1053 pub areas: usize,
1055 pub area_total_bytes: usize,
1057 pub free_regions: usize,
1059 pub clean_regions: usize,
1061 pub global_regions: usize,
1063 pub thread_regions: usize,
1065 pub allocations: u64,
1067 pub slow_path: u64,
1069 pub refill: u64,
1071 pub deallocations: u64,
1073 pub clear_eager_total: u64,
1075 pub clear_slow_total: u64,
1077}
1078
1079#[derive(Debug)]
1081pub struct FileStats {
1082 pub size_class: usize,
1084 pub file_size: usize,
1086 pub allocated_size: usize,
1088 pub mapped: usize,
1090 pub active: usize,
1092 pub dirty: usize,
1094}
1095
1096#[cfg(test)]
1097mod test {
1098 use std::mem::{ManuallyDrop, MaybeUninit};
1099 use std::ptr::NonNull;
1100 use std::sync::atomic::{AtomicBool, Ordering};
1101 use std::sync::Arc;
1102 use std::time::Duration;
1103
1104 use serial_test::serial;
1105
1106 use super::*;
1107
1108 fn initialize() {
1109 lgalloc_set_config(
1110 LgAlloc::new()
1111 .enable()
1112 .with_background_config(BackgroundWorkerConfig {
1113 interval: Duration::from_secs(1),
1114 clear_bytes: 4 << 20,
1115 })
1116 .with_path(std::env::temp_dir())
1117 .file_growth_dampener(1),
1118 );
1119 }
1120
1121 struct Wrapper<T> {
1122 handle: MaybeUninit<Handle>,
1123 ptr: NonNull<MaybeUninit<T>>,
1124 cap: usize,
1125 }
1126
1127 unsafe impl<T: Send> Send for Wrapper<T> {}
1128 unsafe impl<T: Sync> Sync for Wrapper<T> {}
1129
1130 impl<T> Wrapper<T> {
1131 fn allocate(capacity: usize) -> Result<Self, AllocError> {
1132 let (ptr, cap, handle) = allocate(capacity)?;
1133 assert!(cap > 0);
1134 let handle = MaybeUninit::new(handle);
1135 Ok(Self { ptr, cap, handle })
1136 }
1137
1138 fn as_slice(&mut self) -> &mut [MaybeUninit<T>] {
1139 unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.cap) }
1140 }
1141 }
1142
1143 impl<T> Drop for Wrapper<T> {
1144 fn drop(&mut self) {
1145 unsafe { deallocate(self.handle.assume_init_read()) };
1146 }
1147 }
1148
1149 #[test]
1150 #[serial]
1151 fn test_readme() -> Result<(), AllocError> {
1152 initialize();
1153
1154 let (ptr, cap, handle) = allocate::<u8>(2 << 20)?;
1156 let mut vec = ManuallyDrop::new(unsafe { Vec::from_raw_parts(ptr.as_ptr(), 0, cap) });
1158
1159 vec.extend_from_slice(&[1, 2, 3, 4]);
1161
1162 assert_eq!(&*vec, &[1, 2, 3, 4]);
1164
1165 deallocate(handle);
1167 Ok(())
1168 }
1169
1170 #[test]
1171 #[serial]
1172 fn test_1() -> Result<(), AllocError> {
1173 initialize();
1174 <Wrapper<u8>>::allocate(4 << 20)?.as_slice()[0] = MaybeUninit::new(1);
1175 Ok(())
1176 }
1177
1178 #[test]
1179 #[serial]
1180 fn test_3() -> Result<(), AllocError> {
1181 initialize();
1182 let until = Arc::new(AtomicBool::new(true));
1183
1184 let inner = || {
1185 let until = Arc::clone(&until);
1186 move || {
1187 let mut i = 0;
1188 let until = &*until;
1189 while until.load(Ordering::Relaxed) {
1190 i += 1;
1191 let mut r = <Wrapper<u8>>::allocate(4 << 20).unwrap();
1192 r.as_slice()[0] = MaybeUninit::new(1);
1193 }
1194 println!("repetitions: {i}");
1195 }
1196 };
1197 let handles = [
1198 std::thread::spawn(inner()),
1199 std::thread::spawn(inner()),
1200 std::thread::spawn(inner()),
1201 std::thread::spawn(inner()),
1202 ];
1203 std::thread::sleep(Duration::from_secs(4));
1204 until.store(false, Ordering::Relaxed);
1205 for handle in handles {
1206 handle.join().unwrap();
1207 }
1208 Ok(())
1210 }
1211
1212 #[test]
1213 #[serial]
1214 fn test_4() -> Result<(), AllocError> {
1215 initialize();
1216 let until = Arc::new(AtomicBool::new(true));
1217
1218 let inner = || {
1219 let until = Arc::clone(&until);
1220 move || {
1221 let mut i = 0;
1222 let until = &*until;
1223 let batch = 64;
1224 let mut buffer = Vec::with_capacity(batch);
1225 while until.load(Ordering::Relaxed) {
1226 i += 64;
1227 buffer.extend((0..batch).map(|_| {
1228 let mut r = <Wrapper<u8>>::allocate(2 << 20).unwrap();
1229 r.as_slice()[0] = MaybeUninit::new(1);
1230 r
1231 }));
1232 buffer.clear();
1233 }
1234 println!("repetitions vec: {i}");
1235 }
1236 };
1237 let handles = [
1238 std::thread::spawn(inner()),
1239 std::thread::spawn(inner()),
1240 std::thread::spawn(inner()),
1241 std::thread::spawn(inner()),
1242 ];
1243 std::thread::sleep(Duration::from_secs(4));
1244 until.store(false, Ordering::Relaxed);
1245 for handle in handles {
1246 handle.join().unwrap();
1247 }
1248 std::thread::sleep(Duration::from_secs(1));
1249 let stats = lgalloc_stats();
1250 for size_class in &stats.size_class {
1251 println!("size_class {:?}", size_class);
1252 }
1253 match stats.file_stats {
1254 Ok(file_stats) => {
1255 for file_stats in file_stats {
1256 println!("file_stats {:?}", file_stats);
1257 }
1258 }
1259 Err(e) => eprintln!("error: {e}"),
1260 }
1261 Ok(())
1262 }
1263
1264 #[test]
1265 #[serial]
1266 fn leak() -> Result<(), AllocError> {
1267 lgalloc_set_config(&LgAlloc {
1268 enabled: Some(true),
1269 path: Some(std::env::temp_dir()),
1270 ..Default::default()
1271 });
1272 let r = <Wrapper<u8>>::allocate(1000)?;
1273
1274 let thread = std::thread::spawn(move || drop(r));
1275
1276 thread.join().unwrap();
1277 Ok(())
1278 }
1279
1280 #[test]
1281 #[serial]
1282 fn test_zst() -> Result<(), AllocError> {
1283 initialize();
1284 <Wrapper<()>>::allocate(10)?;
1285 Ok(())
1286 }
1287
1288 #[test]
1289 #[serial]
1290 fn test_zero_capacity_zst() -> Result<(), AllocError> {
1291 initialize();
1292 <Wrapper<()>>::allocate(0)?;
1293 Ok(())
1294 }
1295
1296 #[test]
1297 #[serial]
1298 fn test_zero_capacity_nonzst() -> Result<(), AllocError> {
1299 initialize();
1300 <Wrapper<()>>::allocate(0)?;
1301 Ok(())
1302 }
1303
1304 #[test]
1305 #[serial]
1306 fn test_stats() -> Result<(), AllocError> {
1307 initialize();
1308 let (_ptr, _cap, handle) = allocate::<usize>(1024)?;
1309 deallocate(handle);
1310
1311 let stats = lgalloc_stats();
1312
1313 assert!(!stats.size_class.is_empty());
1314
1315 Ok(())
1316 }
1317
1318 #[test]
1319 #[serial]
1320 fn test_disable() {
1321 lgalloc_set_config(&*LgAlloc::new().disable());
1322 assert!(matches!(allocate::<u8>(1024), Err(AllocError::Disabled)));
1323 }
1324}