mysql_async/
buffer_pool.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use crossbeam::queue::ArrayQueue;
10use std::{mem::take, ops::Deref, sync::Arc};
11
12#[derive(Debug)]
13pub struct BufferPool {
14    buffer_size_cap: usize,
15    buffer_init_cap: usize,
16    pool: ArrayQueue<Vec<u8>>,
17}
18
19impl BufferPool {
20    pub fn new() -> Self {
21        let pool_cap = std::env::var("MYSQL_ASYNC_BUFFER_POOL_CAP")
22            .ok()
23            .and_then(|x| x.parse().ok())
24            .unwrap_or(128_usize);
25
26        let buffer_size_cap = std::env::var("MYSQL_ASYNC_BUFFER_SIZE_CAP")
27            .ok()
28            .and_then(|x| x.parse().ok())
29            .unwrap_or(4 * 1024 * 1024);
30
31        let buffer_init_cap = std::env::var("MYSQL_ASYNC_BUFFER_INIT_CAP")
32            .ok()
33            .and_then(|x| x.parse().ok())
34            .unwrap_or(0);
35
36        Self {
37            pool: ArrayQueue::new(pool_cap),
38            buffer_size_cap,
39            buffer_init_cap,
40        }
41    }
42
43    pub fn get(self: &Arc<Self>) -> PooledBuf {
44        let buf = self
45            .pool
46            .pop()
47            .unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap));
48        debug_assert_eq!(buf.len(), 0);
49        PooledBuf(buf, self.clone())
50    }
51
52    pub fn get_with<T: AsRef<[u8]>>(self: &Arc<Self>, content: T) -> PooledBuf {
53        let mut buf = self.get();
54        buf.as_mut().extend_from_slice(content.as_ref());
55        buf
56    }
57
58    fn put(self: &Arc<Self>, mut buf: Vec<u8>) {
59        // SAFETY:
60        // 1. OK – 0 is always within capacity
61        // 2. OK - nothing to initialize
62        unsafe { buf.set_len(0) }
63
64        buf.shrink_to(self.buffer_size_cap);
65
66        // ArrayQueue will make sure to drop the buffer if capacity is exceeded
67        let _ = self.pool.push(buf);
68    }
69}
70
71impl Default for BufferPool {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77#[derive(Debug)]
78pub struct PooledBuf(Vec<u8>, Arc<BufferPool>);
79
80impl AsMut<Vec<u8>> for PooledBuf {
81    fn as_mut(&mut self) -> &mut Vec<u8> {
82        &mut self.0
83    }
84}
85
86impl Deref for PooledBuf {
87    type Target = [u8];
88
89    fn deref(&self) -> &Self::Target {
90        self.0.deref()
91    }
92}
93
94impl Drop for PooledBuf {
95    fn drop(&mut self) {
96        self.1.put(take(&mut self.0))
97    }
98}