mysql_async/conn/pool/
ttl_check_inerval.rs1use futures_util::future::{ok, FutureExt};
10use tokio::time::{self, Interval};
11
12use std::{
13 collections::VecDeque,
14 future::Future,
15 sync::{atomic::Ordering, Arc},
16};
17
18use super::Inner;
19use crate::PoolOpts;
20use futures_core::task::{Context, Poll};
21use std::pin::Pin;
22
23pub(crate) struct TtlCheckInterval {
29 inner: Arc<Inner>,
30 interval: Interval,
31 pool_opts: PoolOpts,
32}
33
34impl TtlCheckInterval {
35 pub fn new(pool_opts: PoolOpts, inner: Arc<Inner>) -> Self {
37 let interval = time::interval(pool_opts.ttl_check_interval());
38 Self {
39 inner,
40 interval,
41 pool_opts,
42 }
43 }
44
45 pub fn check_ttl(&self) {
47 let to_be_dropped = {
48 let mut exchange = self.inner.exchange.lock().unwrap();
49
50 let num_to_drop = exchange
51 .available
52 .len()
53 .saturating_sub(self.pool_opts.constraints().min());
54
55 let mut to_be_dropped = Vec::<_>::with_capacity(exchange.available.len());
56 let mut kept_available =
57 VecDeque::<_>::with_capacity(self.pool_opts.constraints().max());
58
59 while let Some(conn) = exchange.available.pop_front() {
60 if conn.expired()
61 || (to_be_dropped.len() < num_to_drop
62 && conn.elapsed() > self.pool_opts.inactive_connection_ttl())
63 {
64 to_be_dropped.push(conn);
65 } else {
66 kept_available.push_back(conn);
67 }
68 }
69 exchange.available = kept_available;
70 self.inner
71 .metrics
72 .connections_in_pool
73 .store(exchange.available.len(), Ordering::Relaxed);
74 to_be_dropped
75 };
76
77 for idling_conn in to_be_dropped {
78 assert!(idling_conn.conn.inner.pool.is_none());
79 let inner = self.inner.clone();
80 tokio::spawn(idling_conn.conn.disconnect().then(move |_| {
81 let mut exchange = inner.exchange.lock().unwrap();
82 exchange.exist -= 1;
83 inner
84 .metrics
85 .connection_count
86 .store(exchange.exist, Ordering::Relaxed);
87 ok::<_, ()>(())
88 }));
89 }
90 }
91}
92
93impl Future for TtlCheckInterval {
94 type Output = ();
95
96 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97 loop {
98 let _ = futures_core::ready!(Pin::new(&mut self.interval).poll_tick(cx));
99 let close = self.inner.close.load(Ordering::Acquire);
100
101 if !close {
102 self.check_ttl();
103 } else {
104 return Poll::Ready(());
105 }
106 }
107 }
108}