mysql_async/conn/pool/
ttl_check_inerval.rs

1// Copyright (c) 2019 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_util::future::{ok, FutureExt};
10use pin_project::pin_project;
11use tokio::time::{self, Interval};
12
13use std::{
14    collections::VecDeque,
15    future::Future,
16    sync::{atomic::Ordering, Arc},
17};
18
19use super::Inner;
20use crate::PoolOpts;
21use futures_core::task::{Context, Poll};
22use std::pin::Pin;
23
24/// Idling connections TTL check interval.
25///
26/// The purpose of this interval is to remove idling connections that both:
27/// * overflows min bound of the pool;
28/// * idles longer then `inactive_connection_ttl`.
29#[pin_project]
30pub(crate) struct TtlCheckInterval {
31    inner: Arc<Inner>,
32    #[pin]
33    interval: Interval,
34    pool_opts: PoolOpts,
35}
36
37impl TtlCheckInterval {
38    /// Creates new `TtlCheckInterval`.
39    pub fn new(pool_opts: PoolOpts, inner: Arc<Inner>) -> Self {
40        let interval = time::interval(pool_opts.ttl_check_interval());
41        Self {
42            inner,
43            interval,
44            pool_opts,
45        }
46    }
47
48    /// Perform the check.
49    pub fn check_ttl(&self) {
50        let to_be_dropped = {
51            let mut exchange = self.inner.exchange.lock().unwrap();
52
53            let num_to_drop = exchange
54                .available
55                .len()
56                .saturating_sub(self.pool_opts.constraints().min());
57
58            let mut to_be_dropped = Vec::<_>::with_capacity(exchange.available.len());
59            let mut kept_available =
60                VecDeque::<_>::with_capacity(self.pool_opts.constraints().max());
61
62            while let Some(conn) = exchange.available.pop_front() {
63                if conn.expired()
64                    || (to_be_dropped.len() < num_to_drop
65                        && conn.elapsed() > self.pool_opts.inactive_connection_ttl())
66                {
67                    to_be_dropped.push(conn);
68                } else {
69                    kept_available.push_back(conn);
70                }
71            }
72            exchange.available = kept_available;
73            to_be_dropped
74        };
75
76        for idling_conn in to_be_dropped {
77            assert!(idling_conn.conn.inner.pool.is_none());
78            let inner = self.inner.clone();
79            tokio::spawn(idling_conn.conn.disconnect().then(move |_| {
80                let mut exchange = inner.exchange.lock().unwrap();
81                exchange.exist -= 1;
82                ok::<_, ()>(())
83            }));
84        }
85    }
86}
87
88impl Future for TtlCheckInterval {
89    type Output = ();
90
91    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92        loop {
93            let _ = futures_core::ready!(self.as_mut().project().interval.poll_tick(cx));
94            let close = self.inner.close.load(Ordering::Acquire);
95
96            if !close {
97                self.check_ttl();
98            } else {
99                return Poll::Ready(());
100            }
101        }
102    }
103}