mysql_async/conn/pool/
ttl_check_inerval.rs
1use futures_util::future::{ok, FutureExt};
10use pin_project::pin_project;
11use tokio::time::{self, Interval};
12
13use std::{
14 collections::VecDeque,
15 future::Future,
16 sync::{atomic::Ordering, Arc},
17};
18
19use super::Inner;
20use crate::PoolOpts;
21use futures_core::task::{Context, Poll};
22use std::pin::Pin;
23
24#[pin_project]
30pub(crate) struct TtlCheckInterval {
31 inner: Arc<Inner>,
32 #[pin]
33 interval: Interval,
34 pool_opts: PoolOpts,
35}
36
37impl TtlCheckInterval {
38 pub fn new(pool_opts: PoolOpts, inner: Arc<Inner>) -> Self {
40 let interval = time::interval(pool_opts.ttl_check_interval());
41 Self {
42 inner,
43 interval,
44 pool_opts,
45 }
46 }
47
48 pub fn check_ttl(&self) {
50 let to_be_dropped = {
51 let mut exchange = self.inner.exchange.lock().unwrap();
52
53 let num_to_drop = exchange
54 .available
55 .len()
56 .saturating_sub(self.pool_opts.constraints().min());
57
58 let mut to_be_dropped = Vec::<_>::with_capacity(exchange.available.len());
59 let mut kept_available =
60 VecDeque::<_>::with_capacity(self.pool_opts.constraints().max());
61
62 while let Some(conn) = exchange.available.pop_front() {
63 if conn.expired()
64 || (to_be_dropped.len() < num_to_drop
65 && conn.elapsed() > self.pool_opts.inactive_connection_ttl())
66 {
67 to_be_dropped.push(conn);
68 } else {
69 kept_available.push_back(conn);
70 }
71 }
72 exchange.available = kept_available;
73 to_be_dropped
74 };
75
76 for idling_conn in to_be_dropped {
77 assert!(idling_conn.conn.inner.pool.is_none());
78 let inner = self.inner.clone();
79 tokio::spawn(idling_conn.conn.disconnect().then(move |_| {
80 let mut exchange = inner.exchange.lock().unwrap();
81 exchange.exist -= 1;
82 ok::<_, ()>(())
83 }));
84 }
85 }
86}
87
88impl Future for TtlCheckInterval {
89 type Output = ();
90
91 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92 loop {
93 let _ = futures_core::ready!(self.as_mut().project().interval.poll_tick(cx));
94 let close = self.inner.close.load(Ordering::Acquire);
95
96 if !close {
97 self.check_ttl();
98 } else {
99 return Poll::Ready(());
100 }
101 }
102 }
103}