mysql_async/
buffer_pool.rs
1use crossbeam::queue::ArrayQueue;
10use std::{mem::take, ops::Deref, sync::Arc};
11
12#[derive(Debug)]
13pub struct BufferPool {
14 buffer_size_cap: usize,
15 buffer_init_cap: usize,
16 pool: ArrayQueue<Vec<u8>>,
17}
18
19impl BufferPool {
20 pub fn new() -> Self {
21 let pool_cap = std::env::var("MYSQL_ASYNC_BUFFER_POOL_CAP")
22 .ok()
23 .and_then(|x| x.parse().ok())
24 .unwrap_or(128_usize);
25
26 let buffer_size_cap = std::env::var("MYSQL_ASYNC_BUFFER_SIZE_CAP")
27 .ok()
28 .and_then(|x| x.parse().ok())
29 .unwrap_or(4 * 1024 * 1024);
30
31 let buffer_init_cap = std::env::var("MYSQL_ASYNC_BUFFER_INIT_CAP")
32 .ok()
33 .and_then(|x| x.parse().ok())
34 .unwrap_or(0);
35
36 Self {
37 pool: ArrayQueue::new(pool_cap),
38 buffer_size_cap,
39 buffer_init_cap,
40 }
41 }
42
43 pub fn get(self: &Arc<Self>) -> PooledBuf {
44 let buf = self
45 .pool
46 .pop()
47 .unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap));
48 debug_assert_eq!(buf.len(), 0);
49 PooledBuf(buf, self.clone())
50 }
51
52 pub fn get_with<T: AsRef<[u8]>>(self: &Arc<Self>, content: T) -> PooledBuf {
53 let mut buf = self.get();
54 buf.as_mut().extend_from_slice(content.as_ref());
55 buf
56 }
57
58 fn put(self: &Arc<Self>, mut buf: Vec<u8>) {
59 unsafe { buf.set_len(0) }
63
64 buf.shrink_to(self.buffer_size_cap);
65
66 let _ = self.pool.push(buf);
68 }
69}
70
71impl Default for BufferPool {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77#[derive(Debug)]
78pub struct PooledBuf(Vec<u8>, Arc<BufferPool>);
79
80impl AsMut<Vec<u8>> for PooledBuf {
81 fn as_mut(&mut self) -> &mut Vec<u8> {
82 &mut self.0
83 }
84}
85
86impl Deref for PooledBuf {
87 type Target = [u8];
88
89 fn deref(&self) -> &Self::Target {
90 self.0.deref()
91 }
92}
93
94impl Drop for PooledBuf {
95 fn drop(&mut self) {
96 self.1.put(take(&mut self.0))
97 }
98}