mysql_async/conn/pool/
recycler.rs

1// Copyright (c) 2019 mysql_async contributors
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_core::stream::Stream;
10use futures_util::{stream::futures_unordered::FuturesUnordered, FutureExt};
11use tokio::sync::mpsc;
12
13use std::{
14    future::Future,
15    pin::Pin,
16    sync::{atomic::Ordering, Arc},
17    task::{Context, Poll},
18};
19
20use super::{IdlingConn, Inner};
21use crate::{queryable::transaction::TxStatus, BoxFuture, Conn, PoolOpts};
22use tokio::sync::mpsc::UnboundedReceiver;
23
24#[derive(Debug)]
25#[must_use = "futures do nothing unless you `.await` or poll them"]
26pub(crate) struct Recycler {
27    inner: Arc<Inner>,
28    discard: FuturesUnordered<BoxFuture<'static, ()>>,
29    discarded: usize,
30    cleaning: FuturesUnordered<BoxFuture<'static, Conn>>,
31    reset: FuturesUnordered<BoxFuture<'static, Conn>>,
32
33    // Option<Conn> so that we have a way to send a "I didn't make a Conn after all" signal
34    dropped: mpsc::UnboundedReceiver<Option<Conn>>,
35    /// Pool options.
36    pool_opts: PoolOpts,
37    eof: bool,
38}
39
40impl Recycler {
41    pub fn new(
42        pool_opts: PoolOpts,
43        inner: Arc<Inner>,
44        dropped: UnboundedReceiver<Option<Conn>>,
45    ) -> Self {
46        Self {
47            inner,
48            discard: FuturesUnordered::new(),
49            discarded: 0,
50            cleaning: FuturesUnordered::new(),
51            reset: FuturesUnordered::new(),
52            dropped,
53            pool_opts,
54            eof: false,
55        }
56    }
57}
58
59impl Future for Recycler {
60    type Output = ();
61
62    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63        let mut close = self.inner.close.load(Ordering::Acquire);
64
65        macro_rules! conn_return {
66            ($self:ident, $conn:ident, $pool_is_closed: expr) => {{
67                let mut exchange = $self.inner.exchange.lock().unwrap();
68                if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() {
69                    drop(exchange);
70                    $self
71                        .inner
72                        .metrics
73                        .discarded_superfluous_connection
74                        .fetch_add(1, Ordering::Relaxed);
75                    $self.discard.push($conn.close_conn().boxed());
76                } else {
77                    $self
78                        .inner
79                        .metrics
80                        .connection_returned_to_pool
81                        .fetch_add(1, Ordering::Relaxed);
82                    $self
83                        .inner
84                        .metrics
85                        .connections_in_pool
86                        .fetch_add(1, Ordering::Relaxed);
87                    #[cfg(feature = "hdrhistogram")]
88                    $self
89                        .inner
90                        .metrics
91                        .connection_active_duration
92                        .lock()
93                        .unwrap()
94                        .saturating_record($conn.inner.active_since.elapsed().as_micros() as u64);
95                    exchange.available.push_back($conn.into());
96                    if let Some(w) = exchange.waiting.pop() {
97                        w.wake();
98                    }
99                }
100            }};
101        }
102
103        macro_rules! conn_decision {
104            ($self:ident, $conn:ident) => {
105                if $conn.inner.stream.is_none() || $conn.inner.disconnected {
106                    // drop unestablished connection
107                    $self
108                        .inner
109                        .metrics
110                        .discarded_unestablished_connection
111                        .fetch_add(1, Ordering::Relaxed);
112                    $self.discard.push(futures_util::future::ok(()).boxed());
113                } else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() {
114                    $self
115                        .inner
116                        .metrics
117                        .dirty_connection_return
118                        .fetch_add(1, Ordering::Relaxed);
119                    $self.cleaning.push($conn.cleanup_for_pool().boxed());
120                } else if $conn.expired() || close {
121                    $self
122                        .inner
123                        .metrics
124                        .discarded_expired_connection
125                        .fetch_add(1, Ordering::Relaxed);
126                    $self.discard.push($conn.close_conn().boxed());
127                } else if $conn.inner.reset_upon_returning_to_a_pool {
128                    $self
129                        .inner
130                        .metrics
131                        .resetting_connection
132                        .fetch_add(1, Ordering::Relaxed);
133                    $self.reset.push($conn.reset_for_pool().boxed());
134                } else {
135                    conn_return!($self, $conn, false);
136                }
137            };
138        }
139
140        while !self.eof {
141            // see if there are more connections for us to recycle
142            match Pin::new(&mut self.dropped).poll_recv(cx) {
143                Poll::Ready(Some(Some(conn))) => {
144                    assert!(conn.inner.pool.is_none());
145                    conn_decision!(self, conn);
146                }
147                Poll::Ready(Some(None)) => {
148                    // someone signaled us that it's exit time
149                    close = self.inner.close.load(Ordering::Acquire);
150                    assert!(close);
151                    continue;
152                }
153                Poll::Ready(None) => {
154                    // no more connections are coming -- time to exit!
155                    self.inner.close.store(true, Ordering::Release);
156                    self.eof = true;
157                    close = true;
158                }
159                Poll::Pending => {
160                    // nope -- but let's still make progress on the ones we have
161                    break;
162                }
163            }
164        }
165
166        // if we've been asked to close, reclaim any idle connections
167        if close || self.eof {
168            while let Some(IdlingConn { conn, .. }) =
169                self.inner.exchange.lock().unwrap().available.pop_front()
170            {
171                assert!(conn.inner.pool.is_none());
172                conn_decision!(self, conn);
173            }
174        }
175
176        // are any dirty connections ready for us to reclaim?
177        loop {
178            match Pin::new(&mut self.cleaning).poll_next(cx) {
179                Poll::Pending | Poll::Ready(None) => break,
180                Poll::Ready(Some(Ok(conn))) => conn_decision!(self, conn),
181                Poll::Ready(Some(Err(e))) => {
182                    // an error occurred while cleaning a connection.
183                    // what do we do? replace it with a new connection?
184                    // for a conn to end up in cleaning, it must have come through .dropped.
185                    // anything that comes through .dropped we know has .pool.is_none().
186                    // therefore, dropping the conn won't decrement .exist, so we need to do that.
187                    self.discarded += 1;
188                    self.inner
189                        .metrics
190                        .discarded_error_during_cleanup
191                        .fetch_add(1, Ordering::Relaxed);
192                    // NOTE: we're discarding the error here
193                    let _ = e;
194                }
195            }
196        }
197
198        // let's iterate through connections being successfully reset
199        loop {
200            match Pin::new(&mut self.reset).poll_next(cx) {
201                Poll::Pending | Poll::Ready(None) => break,
202                Poll::Ready(Some(Ok(conn))) => conn_return!(self, conn, close),
203                Poll::Ready(Some(Err(e))) => {
204                    // an error during reset.
205                    // replace with a new connection
206                    self.discarded += 1;
207                    self.inner
208                        .metrics
209                        .discarded_error_during_cleanup
210                        .fetch_add(1, Ordering::Relaxed);
211                    // NOTE: we're discarding the error here
212                    let _ = e;
213                }
214            }
215        }
216
217        // are there any torn-down connections for us to deal with?
218        loop {
219            match Pin::new(&mut self.discard).poll_next(cx) {
220                Poll::Pending | Poll::Ready(None) => break,
221                Poll::Ready(Some(Ok(()))) => {
222                    // yes! count it.
223                    // note that we must decrement .exist since the connection does not have a
224                    // .pool, and therefore won't do anything useful when it is dropped.
225                    self.discarded += 1
226                }
227                Poll::Ready(Some(Err(e))) => {
228                    // an error occurred while closing a connection.
229                    // what do we do? we still replace it with a new connection..
230                    self.discarded += 1;
231                    self.inner
232                        .metrics
233                        .discarded_error_during_cleanup
234                        .fetch_add(1, Ordering::Relaxed);
235                    // NOTE: we're discarding the error here
236                    let _ = e;
237                }
238            }
239        }
240
241        if self.discarded != 0 {
242            self.inner
243                .metrics
244                .connection_count
245                .fetch_sub(self.discarded, Ordering::Relaxed);
246
247            // we need to open up slots for new connctions to be established!
248            let mut exchange = self.inner.exchange.lock().unwrap();
249            exchange.exist -= self.discarded;
250            for _ in 0..self.discarded {
251                if let Some(w) = exchange.waiting.pop() {
252                    w.wake();
253                }
254            }
255            drop(exchange);
256            self.discarded = 0;
257        }
258
259        // NOTE: we are asserting here that no more connections will ever be returned to
260        // us. see the explanation in Pool::poll_new_conn for why this is okay, even during
261        // races on .exist
262        let effectively_eof = close && self.inner.exchange.lock().unwrap().exist == 0;
263
264        if (self.eof || effectively_eof)
265            && self.cleaning.is_empty()
266            && self.discard.is_empty()
267            && self.reset.is_empty()
268        {
269            // we know that all Pool handles have been dropped (self.dropped.poll returned None).
270
271            // if this assertion fails, where are the remaining connections?
272            assert_eq!(self.inner.exchange.lock().unwrap().available.len(), 0);
273
274            // NOTE: it is _necessary_ that we set this _before_ we call .wake
275            // otherwise, the following may happen to the DisconnectPool future:
276            //
277            //  - We wake all in .wake
278            //  - DisconnectPool::poll adds to .wake
279            //  - DisconnectPool::poll reads .closed == false
280            //  - We set .closed = true
281            //
282            // At this point, DisconnectPool::poll will never be notified again.
283            self.inner.closed.store(true, Ordering::Release);
284        }
285
286        if self.inner.closed.load(Ordering::Acquire) {
287            // `DisconnectPool` might still wait to be woken up.
288            let mut exchange = self.inner.exchange.lock().unwrap();
289            while let Some(w) = exchange.waiting.pop() {
290                w.wake();
291            }
292            // we're about to exit, so there better be no outstanding connections
293            assert_eq!(exchange.exist, 0);
294            assert_eq!(exchange.available.len(), 0);
295            drop(exchange);
296
297            Poll::Ready(())
298        } else {
299            Poll::Pending
300        }
301    }
302}
303
304impl Drop for Recycler {
305    fn drop(&mut self) {
306        if !self.inner.closed.load(Ordering::Acquire) {
307            // user did not wait for outstanding connections to finish!
308            // this is not good -- we won't be able to shut down our connections cleanly
309            // all we can do is try to ensure a clean shutdown
310            self.inner.close.store(true, Ordering::SeqCst);
311        }
312    }
313}