mysql_async/conn/pool/
ttl_check_inerval.rs

1// Copyright (c) 2019 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_util::future::{ok, FutureExt};
10use tokio::time::{self, Interval};
11
12use std::{
13    collections::VecDeque,
14    future::Future,
15    sync::{atomic::Ordering, Arc},
16};
17
18use super::Inner;
19use crate::PoolOpts;
20use futures_core::task::{Context, Poll};
21use std::pin::Pin;
22
23/// Idling connections TTL check interval.
24///
25/// The purpose of this interval is to remove idling connections that both:
26/// * overflows min bound of the pool;
27/// * idles longer then `inactive_connection_ttl`.
28pub(crate) struct TtlCheckInterval {
29    inner: Arc<Inner>,
30    interval: Interval,
31    pool_opts: PoolOpts,
32}
33
34impl TtlCheckInterval {
35    /// Creates new `TtlCheckInterval`.
36    pub fn new(pool_opts: PoolOpts, inner: Arc<Inner>) -> Self {
37        let interval = time::interval(pool_opts.ttl_check_interval());
38        Self {
39            inner,
40            interval,
41            pool_opts,
42        }
43    }
44
45    /// Perform the check.
46    pub fn check_ttl(&self) {
47        let to_be_dropped = {
48            let mut exchange = self.inner.exchange.lock().unwrap();
49
50            let num_to_drop = exchange
51                .available
52                .len()
53                .saturating_sub(self.pool_opts.constraints().min());
54
55            let mut to_be_dropped = Vec::<_>::with_capacity(exchange.available.len());
56            let mut kept_available =
57                VecDeque::<_>::with_capacity(self.pool_opts.constraints().max());
58
59            while let Some(conn) = exchange.available.pop_front() {
60                if conn.expired()
61                    || (to_be_dropped.len() < num_to_drop
62                        && conn.elapsed() > self.pool_opts.inactive_connection_ttl())
63                {
64                    to_be_dropped.push(conn);
65                } else {
66                    kept_available.push_back(conn);
67                }
68            }
69            exchange.available = kept_available;
70            self.inner
71                .metrics
72                .connections_in_pool
73                .store(exchange.available.len(), Ordering::Relaxed);
74            to_be_dropped
75        };
76
77        for idling_conn in to_be_dropped {
78            assert!(idling_conn.conn.inner.pool.is_none());
79            let inner = self.inner.clone();
80            tokio::spawn(idling_conn.conn.disconnect().then(move |_| {
81                let mut exchange = inner.exchange.lock().unwrap();
82                exchange.exist -= 1;
83                inner
84                    .metrics
85                    .connection_count
86                    .store(exchange.exist, Ordering::Relaxed);
87                ok::<_, ()>(())
88            }));
89        }
90    }
91}
92
93impl Future for TtlCheckInterval {
94    type Output = ();
95
96    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97        loop {
98            let _ = futures_core::ready!(Pin::new(&mut self.interval).poll_tick(cx));
99            let close = self.inner.close.load(Ordering::Acquire);
100
101            if !close {
102                self.check_ttl();
103            } else {
104                return Poll::Ready(());
105            }
106        }
107    }
108}