use futures_util::FutureExt;
use keyed_priority_queue::KeyedPriorityQueue;
use tokio::sync::mpsc;
use std::{
borrow::Borrow,
cmp::Reverse,
collections::VecDeque,
hash::{Hash, Hasher},
str::FromStr,
sync::{atomic, Arc, Mutex},
task::{Context, Poll, Waker},
time::{Duration, Instant},
};
use crate::{
conn::{pool::futures::*, Conn},
error::*,
opts::{Opts, PoolOpts},
queryable::transaction::{Transaction, TxOpts},
};
mod recycler;
pub mod futures;
mod ttl_check_inerval;
#[derive(Debug)]
struct IdlingConn {
since: Instant,
conn: Conn,
}
impl IdlingConn {
fn expired(&self) -> bool {
self.conn
.inner
.ttl_deadline
.map(|t| Instant::now() > t)
.unwrap_or_default()
}
fn elapsed(&self) -> Duration {
self.since.elapsed()
}
}
impl From<Conn> for IdlingConn {
fn from(conn: Conn) -> Self {
Self {
since: Instant::now(),
conn,
}
}
}
#[derive(Debug)]
struct Exchange {
waiting: Waitlist,
available: VecDeque<IdlingConn>,
exist: usize,
recycler: Option<(mpsc::UnboundedReceiver<Option<Conn>>, PoolOpts)>,
}
impl Exchange {
fn spawn_futures_if_needed(&mut self, inner: &Arc<Inner>) {
use recycler::Recycler;
use ttl_check_inerval::TtlCheckInterval;
if let Some((dropped, pool_opts)) = self.recycler.take() {
tokio::spawn(Recycler::new(pool_opts.clone(), inner.clone(), dropped));
if pool_opts.inactive_connection_ttl() > Duration::ZERO
|| pool_opts.abs_conn_ttl().is_some()
{
tokio::spawn(TtlCheckInterval::new(pool_opts, inner.clone()));
}
}
}
}
#[derive(Default, Debug)]
struct Waitlist {
queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
}
impl Waitlist {
fn push(&mut self, waker: Waker, queue_id: QueueId) {
self.remove(queue_id);
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
}
fn pop(&mut self) -> Option<Waker> {
match self.queue.pop() {
Some((qw, _)) => Some(qw.waker),
None => None,
}
}
fn remove(&mut self, id: QueueId) {
self.queue.remove(&id);
}
fn peek_id(&mut self) -> Option<QueueId> {
self.queue.peek().map(|(qw, _)| qw.queue_id)
}
}
const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) struct QueueId(Reverse<u64>);
impl QueueId {
fn next() -> Self {
static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
QueueId(Reverse(id))
}
}
#[derive(Debug)]
struct QueuedWaker {
queue_id: QueueId,
waker: Waker,
}
impl Eq for QueuedWaker {}
impl Borrow<QueueId> for QueuedWaker {
fn borrow(&self) -> &QueueId {
&self.queue_id
}
}
impl PartialEq for QueuedWaker {
fn eq(&self, other: &Self) -> bool {
self.queue_id == other.queue_id
}
}
impl Hash for QueuedWaker {
fn hash<H: Hasher>(&self, state: &mut H) {
self.queue_id.hash(state)
}
}
#[derive(Debug)]
pub struct Inner {
close: atomic::AtomicBool,
closed: atomic::AtomicBool,
exchange: Mutex<Exchange>,
}
#[derive(Debug, Clone)]
pub struct Pool {
opts: Opts,
inner: Arc<Inner>,
drop: mpsc::UnboundedSender<Option<Conn>>,
}
impl Pool {
pub fn new<O>(opts: O) -> Pool
where
Opts: TryFrom<O>,
<Opts as TryFrom<O>>::Error: std::error::Error,
{
let opts = Opts::try_from(opts).unwrap();
let pool_opts = opts.pool_opts().clone();
let (tx, rx) = mpsc::unbounded_channel();
Pool {
opts,
inner: Arc::new(Inner {
close: false.into(),
closed: false.into(),
exchange: Mutex::new(Exchange {
available: VecDeque::with_capacity(pool_opts.constraints().max()),
waiting: Waitlist::default(),
exist: 0,
recycler: Some((rx, pool_opts)),
}),
}),
drop: tx,
}
}
pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
let opts = Opts::from_str(url.as_ref())?;
Ok(Pool::new(opts))
}
pub fn get_conn(&self) -> GetConn {
let reset_connection = self.opts.pool_opts().reset_connection();
GetConn::new(self, reset_connection)
}
pub async fn start_transaction(&self, options: TxOpts) -> Result<Transaction<'static>> {
let conn = self.get_conn().await?;
Transaction::new(conn, options).await
}
pub fn disconnect(self) -> DisconnectPool {
DisconnectPool::new(self)
}
fn return_conn(&mut self, conn: Conn) {
self.send_to_recycler(conn);
}
fn send_to_recycler(&self, conn: Conn) {
if let Err(conn) = self.drop.send(Some(conn)) {
let conn = conn.0.unwrap();
if !self.inner.closed.load(atomic::Ordering::SeqCst) {
assert!(conn.inner.pool.is_none());
drop(conn);
} else {
unreachable!("Recycler exited while connections still exist");
}
}
}
pub(super) fn cancel_connection(&self) {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.exist -= 1;
if let Some(w) = exchange.waiting.pop() {
w.wake();
}
}
fn poll_new_conn(
&mut self,
cx: &mut Context<'_>,
queue_id: QueueId,
) -> Poll<Result<GetConnInner>> {
let mut exchange = self.inner.exchange.lock().unwrap();
if self.inner.close.load(atomic::Ordering::Acquire) {
return Err(Error::Driver(DriverError::PoolDisconnected)).into();
}
exchange.spawn_futures_if_needed(&self.inner);
let highest = if let Some(cur) = exchange.waiting.peek_id() {
queue_id > cur
} else {
true
};
if !highest {
exchange.waiting.push(cx.waker().clone(), queue_id);
return Poll::Pending;
}
while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() {
if !conn.expired() {
return Poll::Ready(Ok(GetConnInner::Checking(
async move {
conn.stream_mut()?.check().await?;
Ok(conn)
}
.boxed(),
)));
} else {
self.send_to_recycler(conn);
}
}
if exchange.exist < self.opts.pool_opts().constraints().max() {
exchange.exist += 1;
return Poll::Ready(Ok(GetConnInner::Connecting(
Conn::new(self.opts.clone()).boxed(),
)));
}
exchange.waiting.push(cx.waker().clone(), queue_id);
Poll::Pending
}
fn unqueue(&self, queue_id: QueueId) {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.waiting.remove(queue_id);
}
}
impl Drop for Conn {
fn drop(&mut self) {
self.inner.infile_handler = None;
if std::thread::panicking() {
if let Some(pool) = self.inner.pool.take() {
pool.cancel_connection();
}
return;
}
if let Some(mut pool) = self.inner.pool.take() {
pool.return_conn(self.take());
} else if self.inner.stream.is_some() && !self.inner.disconnected {
crate::conn::disconnect(self.take());
}
}
}
#[cfg(test)]
mod test {
use futures_util::{
future::{join_all, select, select_all, try_join_all, Either},
poll, try_join, FutureExt,
};
use tokio::time::{sleep, timeout};
use waker_fn::waker_fn;
use std::{
cmp::Reverse,
future::Future,
pin::pin,
sync::{Arc, OnceLock},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
time::Duration,
};
use crate::{
conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID},
opts::PoolOpts,
prelude::*,
test_misc::get_opts,
PoolConstraints, Row, TxOpts, Value,
};
macro_rules! conn_ex_field {
($conn:expr, $field:tt) => {
ex_field!($conn.inner.pool.as_ref().unwrap(), $field)
};
}
macro_rules! ex_field {
($pool:expr, $field:tt) => {
$pool.inner.exchange.lock().unwrap().$field
};
}
fn pool_with_one_connection() -> Pool {
let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
let opts = get_opts().pool_opts(pool_opts.clone());
Pool::new(opts)
}
#[tokio::test]
async fn should_opt_out_of_connection_reset() -> super::Result<()> {
let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
let opts = get_opts().pool_opts(pool_opts.clone());
let pool = Pool::new(opts.clone());
let mut conn = pool.get_conn().await.unwrap();
assert_eq!(
conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
Value::NULL
);
conn.query_drop("SET @foo = 'foo'").await?;
assert_eq!(
conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
"foo",
);
drop(conn);
conn = pool.get_conn().await.unwrap();
assert_eq!(
conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
Value::NULL
);
conn.query_drop("SET @foo = 'foo'").await?;
conn.reset_connection(false);
drop(conn);
conn = pool.get_conn().await.unwrap();
assert_eq!(
conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
"foo",
);
drop(conn);
pool.disconnect().await.unwrap();
let pool = Pool::new(opts.pool_opts(pool_opts.with_reset_connection(false)));
conn = pool.get_conn().await.unwrap();
conn.query_drop("SET @foo = 'foo'").await?;
drop(conn);
conn = pool.get_conn().await.unwrap();
assert_eq!(
conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
"foo",
);
drop(conn);
pool.disconnect().await
}
#[test]
fn should_not_hang() -> super::Result<()> {
pub struct Database {
pool: Pool,
}
impl Database {
pub async fn disconnect(self) -> super::Result<()> {
self.pool.disconnect().await?;
Ok(())
}
}
let runtime = tokio::runtime::Runtime::new().unwrap();
let database = Database {
pool: Pool::new(get_opts()),
};
runtime.block_on(database.disconnect())
}
#[tokio::test]
async fn should_track_conn_if_disconnected_outside_of_a_pool() -> super::Result<()> {
let pool = Pool::new(get_opts());
let conn = pool.get_conn().await?;
conn.disconnect().await?;
pool.disconnect().await?;
Ok(())
}
#[tokio::test]
async fn should_connect() -> super::Result<()> {
let pool = Pool::new(crate::Opts::from(get_opts()));
pool.get_conn().await?.ping().await?;
pool.disconnect().await?;
Ok(())
}
#[tokio::test]
async fn should_reconnect() -> super::Result<()> {
let mut master = crate::Conn::new(get_opts()).await?;
async fn test(master: &mut crate::Conn, opts: crate::OptsBuilder) -> super::Result<()> {
const NUM_CONNS: usize = 5;
let pool = Pool::new(opts);
let connections = (0..NUM_CONNS).map(|_| {
async {
let mut conn = pool.get_conn().await?;
conn.ping().await?;
crate::Result::Ok(conn)
}
.boxed()
});
let ids = try_join_all(connections)
.await?
.into_iter()
.map(|conn| conn.id())
.collect::<Vec<_>>();
sleep(Duration::from_millis(1000)).await;
pool.get_conn().await?;
for id in ids {
master.query_drop(format!("KILL {}", id)).await?;
}
assert_eq!(ex_field!(pool, available).len(), NUM_CONNS);
sleep(Duration::from_millis(500)).await;
let _conn = pool.get_conn().await?;
assert_eq!(ex_field!(pool, available).len(), 0);
drop(_conn);
pool.disconnect().await
}
println!("Check socket/pipe..");
test(&mut master, get_opts()).await?;
println!("Check tcp..");
test(&mut master, get_opts().prefer_socket(false)).await?;
master.disconnect().await
}
#[tokio::test]
async fn should_reuse_connections() -> super::Result<()> {
let pool = pool_with_one_connection();
let mut conn = pool.get_conn().await?;
let server_version = conn.server_version();
let connection_id = conn.id();
for _ in 0..16 {
drop(conn);
conn = pool.get_conn().await?;
println!("CONN connection_id={}", conn.id());
assert!(conn.id() == connection_id || server_version < (5, 7, 2));
}
Ok(())
}
#[tokio::test]
#[ignore]
async fn can_handle_the_pressure() {
let pool = Pool::new(get_opts());
for _ in 0..10i32 {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
for i in 0..10_000 {
let pool = pool.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = pool.get_conn().await.unwrap();
tx.send(i).unwrap();
});
}
drop(tx);
while (rx.recv().await).is_some() {}
}
drop(pool);
}
#[tokio::test]
async fn should_start_transaction() -> super::Result<()> {
let pool = pool_with_one_connection();
"CREATE TABLE IF NOT EXISTS mysql.tmp(id int)"
.ignore(&pool)
.await?;
"DELETE FROM mysql.tmp".ignore(&pool).await?;
let mut tx = pool.start_transaction(TxOpts::default()).await?;
tx.exec_batch(
"INSERT INTO mysql.tmp (id) VALUES (?)",
vec![(1_u8,), (2_u8,)],
)
.await?;
tx.exec_drop("SELECT * FROM mysql.tmp", ()).await?;
drop(tx);
let row_opt = pool
.get_conn()
.await?
.query_first("SELECT COUNT(*) FROM mysql.tmp")
.await?;
assert_eq!(row_opt, Some((0u8,)));
pool.get_conn()
.await?
.query_drop("DROP TABLE mysql.tmp")
.await?;
pool.disconnect().await?;
Ok(())
}
#[tokio::test]
async fn should_check_inactive_connection_ttl() -> super::Result<()> {
const POOL_MIN: usize = 5;
const POOL_MAX: usize = 10;
const INACTIVE_CONNECTION_TTL: Duration = Duration::from_millis(500);
const TTL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
let pool_opts = PoolOpts::default()
.with_constraints(constraints)
.with_inactive_connection_ttl(INACTIVE_CONNECTION_TTL)
.with_ttl_check_interval(TTL_CHECK_INTERVAL);
let pool = Pool::new(get_opts().pool_opts(pool_opts));
let pool_clone = pool.clone();
let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
let conns = try_join_all(conns).await?;
assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
drop(conns);
sleep(Duration::from_millis(100)).await;
assert_eq!(ex_field!(pool_clone, available).len(), POOL_MAX);
sleep(TTL_CHECK_INTERVAL).await;
sleep(Duration::from_millis(500)).await;
assert_eq!(ex_field!(pool_clone, available).len(), POOL_MIN);
Ok(())
}
#[tokio::test]
async fn aa_should_hold_bounds2() -> super::Result<()> {
use std::cmp::min;
const POOL_MIN: usize = 5;
const POOL_MAX: usize = 10;
let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
let pool_opts = PoolOpts::default().with_constraints(constraints);
let pool = Pool::new(get_opts().pool_opts(pool_opts));
let pool_clone = pool.clone();
let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
let mut conns = try_join_all(conns).await?;
assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
while !conns.is_empty() {
let _ = conns.pop();
sleep(Duration::from_millis(500)).await;
let dropped = POOL_MAX - conns.len();
let idle = min(dropped, POOL_MIN);
let expected = conns.len() + idle;
let have = ex_field!(pool_clone, exist);
assert_eq!(have, expected);
}
pool.disconnect().await?;
Ok(())
}
#[tokio::test]
async fn should_hold_bounds1() -> super::Result<()> {
let constraints = PoolConstraints::new(1, 2).unwrap();
let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints));
let pool = Pool::new(opts);
let pool_clone = pool.clone();
let (conn1, conn2) = try_join!(pool.get_conn(), pool.get_conn()).unwrap();
assert_eq!(conn_ex_field!(conn1, exist), 2);
assert_eq!(conn_ex_field!(conn1, available).len(), 0);
drop(conn1);
drop(conn2);
let conn1 = pool_clone.get_conn().await?;
assert_eq!(conn_ex_field!(conn1, available).len(), 0);
drop(conn1);
assert!(ex_field!(pool, available).len() <= 1);
pool.disconnect().await?;
Ok(())
}
#[tokio::test]
async fn should_hold_bounds_on_error() -> super::Result<()> {
let pool = Pool::new("mysql://255.255.255.255");
assert!(try_join!(pool.get_conn(), pool.get_conn()).is_err());
assert_eq!(ex_field!(pool, exist), 0);
Ok(())
}
#[tokio::test]
async fn zz_should_check_wait_timeout_on_get_conn() -> super::Result<()> {
let pool = Pool::new(get_opts());
let mut conn = pool.get_conn().await?;
let wait_timeout_orig: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
conn.query_drop("SET GLOBAL wait_timeout = 3").await?;
conn.disconnect().await?;
let mut conn = pool.get_conn().await?;
let wait_timeout: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
let id1: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
drop(conn);
assert_eq!(wait_timeout, Some(3));
assert_eq!(ex_field!(pool, exist), 1);
sleep(Duration::from_secs(6)).await;
let mut conn = pool.get_conn().await?;
let id2: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
assert_eq!(ex_field!(pool, exist), 1);
assert_ne!(id1, id2);
conn.exec_drop("SET GLOBAL wait_timeout = ?", (wait_timeout_orig,))
.await?;
drop(conn);
pool.disconnect().await?;
Ok(())
}
#[tokio::test]
async fn droptest() -> super::Result<()> {
let pool = Pool::new(get_opts());
let conns = try_join_all((0..10).map(|_| pool.get_conn()))
.await
.unwrap();
drop(conns);
drop(pool);
let pool = Pool::new(get_opts());
let conns = try_join_all((0..10).map(|_| pool.get_conn()))
.await
.unwrap();
drop(pool);
drop(conns);
Ok(())
}
#[test]
fn drop_impl_for_conn_should_not_panic_within_unwind() {
use tokio::runtime;
const PANIC_MESSAGE: &str = "ORIGINAL_PANIC";
let result = std::panic::catch_unwind(|| {
runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let pool = Pool::new(get_opts());
let _conn = pool.get_conn().await.unwrap();
std::panic::panic_any(PANIC_MESSAGE);
});
});
assert_eq!(
*result.unwrap_err().downcast::<&str>().unwrap(),
"ORIGINAL_PANIC",
);
}
#[test]
fn should_not_panic_on_unclean_shutdown() {
for _ in 0..10 {
let rt = tokio::runtime::Runtime::new().unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
rt.block_on(async move {
let pool = Pool::new(get_opts());
let mut c = pool.get_conn().await.unwrap();
tokio::spawn(async move {
let _ = rx.await;
let _ = c.query_drop("SELECT 1").await;
});
});
drop(rt);
let _ = tx.send(());
}
}
#[test]
fn should_perform_clean_shutdown() {
for _ in 0..10 {
let rt = tokio::runtime::Runtime::new().unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
let jh = rt.spawn(async move {
let pool = Pool::new(get_opts());
let mut c = pool.get_conn().await.unwrap();
tokio::spawn(async move {
let _ = rx.await;
let _ = c.query_drop("SELECT 1").await;
});
let _ = pool.disconnect().await;
});
let _ = tx.send(());
rt.block_on(jh).unwrap();
}
}
#[tokio::test]
async fn issue_126_should_cleanup_errors_in_multiresult_sets() -> super::Result<()> {
let pool_constraints = PoolConstraints::new(0, 1).unwrap();
let pool_opts = PoolOpts::default().with_constraints(pool_constraints);
let pool = Pool::new(get_opts().pool_opts(pool_opts));
for _ in 0u8..100 {
pool.get_conn()
.await?
.query_iter("DO '42'; BLABLA;")
.await?;
}
Ok(())
}
#[tokio::test]
async fn should_ignore_non_fatal_errors_while_returning_to_a_pool() -> super::Result<()> {
let pool = pool_with_one_connection();
let id = pool.get_conn().await?.id();
for _ in 0u8..10 {
let mut conn = pool.get_conn().await?;
conn.query_iter("DO '42'; BLABLA;").await?;
assert_eq!(id, conn.id());
}
Ok(())
}
#[tokio::test]
async fn should_remove_waker_of_cancelled_task() {
let pool = pool_with_one_connection();
let only_conn = pool.get_conn().await.unwrap();
let join_handle = tokio::spawn(timeout(Duration::from_secs(1), pool.get_conn()));
sleep(Duration::from_secs(2)).await;
match join_handle.await.unwrap() {
Err(_elapsed) => (),
_ => panic!("unexpected Ok()"),
}
drop(only_conn);
assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len());
}
#[tokio::test]
async fn should_work_if_pooled_connection_operation_is_cancelled() -> super::Result<()> {
let pool = Pool::new(get_opts());
join_all((0..10).map(|_| pool.get_conn())).await;
async fn op(pool: &Pool) {
let _: Option<Row> = pool
.get_conn()
.await
.unwrap()
.exec_first("SELECT ?, ?", (42, "foo"))
.await
.unwrap();
}
let mut max_delay = 0_u128;
for _ in 0..10_usize {
let start = std::time::Instant::now();
op(&pool).await;
max_delay = std::cmp::max(max_delay, start.elapsed().as_micros());
}
for _ in 0_usize..128 {
let fut = select_all((0_usize..5).map(|_| op(&pool).boxed()));
let delay_micros = rand::random::<u128>() % max_delay;
select(
sleep(Duration::from_micros(delay_micros as u64)).boxed(),
fut,
)
.await;
sleep(Duration::from_millis(100)).await;
}
Ok(())
}
#[test]
fn waitlist_integrity() {
const DATA: *const () = &();
const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
const NOOP_FN: unsafe fn(*const ()) = |_| {};
static RW_VTABLE: RawWakerVTable =
RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
let mut waitlist = Waitlist::default();
assert_eq!(0, waitlist.queue.len());
waitlist.push(w.clone(), QueueId(Reverse(4)));
waitlist.push(w.clone(), QueueId(Reverse(2)));
waitlist.push(w.clone(), QueueId(Reverse(8)));
waitlist.push(w.clone(), QUEUE_END_ID);
waitlist.push(w.clone(), QueueId(Reverse(10)));
waitlist.remove(QueueId(Reverse(8)));
assert_eq!(4, waitlist.queue.len());
let (_, id) = waitlist.queue.pop().unwrap();
assert_eq!(2, id.0 .0);
let (_, id) = waitlist.queue.pop().unwrap();
assert_eq!(4, id.0 .0);
let (_, id) = waitlist.queue.pop().unwrap();
assert_eq!(10, id.0 .0);
let (_, id) = waitlist.queue.pop().unwrap();
assert_eq!(QUEUE_END_ID, id);
assert_eq!(0, waitlist.queue.len());
}
#[tokio::test]
async fn check_absolute_connection_ttl() -> super::Result<()> {
let constraints = PoolConstraints::new(1, 3).unwrap();
let pool_opts = PoolOpts::default()
.with_constraints(constraints)
.with_inactive_connection_ttl(Duration::from_secs(99))
.with_ttl_check_interval(Duration::from_secs(1))
.with_abs_conn_ttl(Some(Duration::from_secs(2)));
let pool = Pool::new(get_opts().pool_opts(pool_opts));
let conn_ttl0 = pool.get_conn().await?;
sleep(Duration::from_millis(1000)).await;
let conn_ttl1 = pool.get_conn().await?;
sleep(Duration::from_millis(1000)).await;
let conn_ttl2 = pool.get_conn().await?;
drop(conn_ttl0);
drop(conn_ttl1);
drop(conn_ttl2);
assert_eq!(ex_field!(pool, exist), 3);
sleep(Duration::from_millis(1500)).await;
assert_eq!(ex_field!(pool, exist), 2);
sleep(Duration::from_millis(1000)).await;
assert_eq!(ex_field!(pool, exist), 1);
sleep(Duration::from_millis(1000)).await;
assert_eq!(ex_field!(pool, exist), 0);
Ok(())
}
#[tokio::test]
async fn save_last_waker() {
let pool = pool_with_one_connection();
let conn = pool.get_conn().await.unwrap();
let mut pending_fut = pin!(pool.get_conn());
let build_waker = || {
let called = Arc::new(OnceLock::new());
let called2 = called.clone();
let waker = waker_fn(move || called2.set(()).unwrap());
(called, waker)
};
let mut assert_pending = |waker| {
let mut context = Context::from_waker(&waker);
let p = pending_fut.as_mut().poll(&mut context);
assert!(matches!(p, Poll::Pending));
};
let (first_called, waker) = build_waker();
assert_pending(waker);
let (second_called, waker) = build_waker();
assert_pending(waker);
drop(conn);
while second_called.get().is_none() {
assert!(first_called.get().is_none());
tokio::time::sleep(Duration::from_millis(100)).await;
}
assert!(first_called.get().is_none());
}
#[tokio::test]
async fn check_priorities() -> super::Result<()> {
let pool = pool_with_one_connection();
let queue_len = || {
let exchange = pool.inner.exchange.lock().unwrap();
exchange.waiting.queue.len()
};
let conn = pool.get_conn().await.unwrap();
#[allow(clippy::async_yields_async)]
let get_pending = || async {
let fut = async {
pool.get_conn().await.unwrap();
}
.shared();
let p = poll!(fut.clone());
assert!(matches!(p, Poll::Pending));
fut
};
let fut1 = get_pending().await;
let fut2 = get_pending().await;
assert_eq!(queue_len(), 2);
drop(conn); while queue_len() != 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
let Either::Right((_, fut2)) = select(fut2, fut1).await else {
panic!("wrong future");
};
assert_eq!(queue_len(), 1);
let p = poll!(fut2.clone());
assert!(matches!(p, Poll::Pending));
assert_eq!(queue_len(), 1); fut2.await;
assert_eq!(queue_len(), 0);
let fut3 = get_pending().await;
assert_eq!(queue_len(), 1);
fut3.await;
Ok(())
}
#[cfg(feature = "nightly")]
mod bench {
use futures_util::future::{FutureExt, TryFutureExt};
use tokio::runtime::Runtime;
use crate::{prelude::Queryable, test_misc::get_opts, Pool, PoolConstraints, PoolOpts};
use std::time::Duration;
#[bench]
fn get_conn(bencher: &mut test::Bencher) {
let mut runtime = Runtime::new().unwrap();
let pool = Pool::new(get_opts());
bencher.iter(|| {
let fut = pool
.get_conn()
.and_then(|mut conn| async { conn.ping().await.map(|_| conn) });
runtime.block_on(fut).unwrap();
});
runtime.block_on(pool.disconnect()).unwrap();
}
#[bench]
fn new_conn_on_pool_soft_boundary(bencher: &mut test::Bencher) {
let mut runtime = Runtime::new().unwrap();
let pool_constraints = PoolConstraints::new(0, 1).unwrap();
let pool_opts = PoolOpts::default()
.with_constraints(pool_constraints)
.with_inactive_connection_ttl(Duration::from_secs(1));
let pool = Pool::new(get_opts().pool_opts(pool_opts));
bencher.iter(|| {
let fut = pool.get_conn().map(drop);
runtime.block_on(fut);
});
runtime.block_on(pool.disconnect()).unwrap();
}
}
}