mysql_async/conn/pool/
recycler.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
// Copyright (c) 2019 mysql_async contributors
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.

use futures_core::stream::Stream;
use futures_util::{stream::futures_unordered::FuturesUnordered, FutureExt};
use tokio::sync::mpsc;

use std::{
    future::Future,
    pin::Pin,
    sync::{atomic::Ordering, Arc},
    task::{Context, Poll},
};

use super::{IdlingConn, Inner};
use crate::{queryable::transaction::TxStatus, BoxFuture, Conn, PoolOpts};
use tokio::sync::mpsc::UnboundedReceiver;

#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub(crate) struct Recycler {
    inner: Arc<Inner>,
    discard: FuturesUnordered<BoxFuture<'static, ()>>,
    discarded: usize,
    cleaning: FuturesUnordered<BoxFuture<'static, Conn>>,
    reset: FuturesUnordered<BoxFuture<'static, Conn>>,

    // Option<Conn> so that we have a way to send a "I didn't make a Conn after all" signal
    dropped: mpsc::UnboundedReceiver<Option<Conn>>,
    /// Pool options.
    pool_opts: PoolOpts,
    eof: bool,
}

impl Recycler {
    pub fn new(
        pool_opts: PoolOpts,
        inner: Arc<Inner>,
        dropped: UnboundedReceiver<Option<Conn>>,
    ) -> Self {
        Self {
            inner,
            discard: FuturesUnordered::new(),
            discarded: 0,
            cleaning: FuturesUnordered::new(),
            reset: FuturesUnordered::new(),
            dropped,
            pool_opts,
            eof: false,
        }
    }
}

impl Future for Recycler {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut close = self.inner.close.load(Ordering::Acquire);

        macro_rules! conn_return {
            ($self:ident, $conn:ident, $pool_is_closed: expr) => {{
                let mut exchange = $self.inner.exchange.lock().unwrap();
                if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() {
                    drop(exchange);
                    $self.discard.push($conn.close_conn().boxed());
                } else {
                    exchange.available.push_back($conn.into());
                    if let Some(w) = exchange.waiting.pop() {
                        w.wake();
                    }
                }
            }};
        }

        macro_rules! conn_decision {
            ($self:ident, $conn:ident) => {
                if $conn.inner.stream.is_none() || $conn.inner.disconnected {
                    // drop unestablished connection
                    $self.discard.push(futures_util::future::ok(()).boxed());
                } else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() {
                    $self.cleaning.push($conn.cleanup_for_pool().boxed());
                } else if $conn.expired() || close {
                    $self.discard.push($conn.close_conn().boxed());
                } else if $conn.inner.reset_upon_returning_to_a_pool {
                    $self.reset.push($conn.reset_for_pool().boxed());
                } else {
                    conn_return!($self, $conn, false);
                }
            };
        }

        while !self.eof {
            // see if there are more connections for us to recycle
            match Pin::new(&mut self.dropped).poll_recv(cx) {
                Poll::Ready(Some(Some(conn))) => {
                    assert!(conn.inner.pool.is_none());
                    conn_decision!(self, conn);
                }
                Poll::Ready(Some(None)) => {
                    // someone signaled us that it's exit time
                    close = self.inner.close.load(Ordering::Acquire);
                    assert!(close);
                    continue;
                }
                Poll::Ready(None) => {
                    // no more connections are coming -- time to exit!
                    self.inner.close.store(true, Ordering::Release);
                    self.eof = true;
                    close = true;
                }
                Poll::Pending => {
                    // nope -- but let's still make progress on the ones we have
                    break;
                }
            }
        }

        // if we've been asked to close, reclaim any idle connections
        if close || self.eof {
            while let Some(IdlingConn { conn, .. }) =
                self.inner.exchange.lock().unwrap().available.pop_front()
            {
                assert!(conn.inner.pool.is_none());
                conn_decision!(self, conn);
            }
        }

        // are any dirty connections ready for us to reclaim?
        loop {
            match Pin::new(&mut self.cleaning).poll_next(cx) {
                Poll::Pending | Poll::Ready(None) => break,
                Poll::Ready(Some(Ok(conn))) => conn_decision!(self, conn),
                Poll::Ready(Some(Err(e))) => {
                    // an error occurred while cleaning a connection.
                    // what do we do? replace it with a new connection?
                    // for a conn to end up in cleaning, it must have come through .dropped.
                    // anything that comes through .dropped we know has .pool.is_none().
                    // therefore, dropping the conn won't decrement .exist, so we need to do that.
                    self.discarded += 1;
                    // NOTE: we're discarding the error here
                    let _ = e;
                }
            }
        }

        // let's iterate through connections being successfully reset
        loop {
            match Pin::new(&mut self.reset).poll_next(cx) {
                Poll::Pending | Poll::Ready(None) => break,
                Poll::Ready(Some(Ok(conn))) => conn_return!(self, conn, close),
                Poll::Ready(Some(Err(e))) => {
                    // an error during reset.
                    // replace with a new connection
                    self.discarded += 1;
                    // NOTE: we're discarding the error here
                    let _ = e;
                }
            }
        }

        // are there any torn-down connections for us to deal with?
        loop {
            match Pin::new(&mut self.discard).poll_next(cx) {
                Poll::Pending | Poll::Ready(None) => break,
                Poll::Ready(Some(Ok(()))) => {
                    // yes! count it.
                    // note that we must decrement .exist since the connection does not have a
                    // .pool, and therefore won't do anything useful when it is dropped.
                    self.discarded += 1
                }
                Poll::Ready(Some(Err(e))) => {
                    // an error occurred while closing a connection.
                    // what do we do? we still replace it with a new connection..
                    self.discarded += 1;
                    // NOTE: we're discarding the error here
                    let _ = e;
                }
            }
        }

        if self.discarded != 0 {
            // we need to open up slots for new connctions to be established!
            let mut exchange = self.inner.exchange.lock().unwrap();
            exchange.exist -= self.discarded;
            for _ in 0..self.discarded {
                if let Some(w) = exchange.waiting.pop() {
                    w.wake();
                }
            }
            drop(exchange);
            self.discarded = 0;
        }

        // NOTE: we are asserting here that no more connections will ever be returned to
        // us. see the explanation in Pool::poll_new_conn for why this is okay, even during
        // races on .exist
        let effectively_eof = close && self.inner.exchange.lock().unwrap().exist == 0;

        if (self.eof || effectively_eof)
            && self.cleaning.is_empty()
            && self.discard.is_empty()
            && self.reset.is_empty()
        {
            // we know that all Pool handles have been dropped (self.dropped.poll returned None).

            // if this assertion fails, where are the remaining connections?
            assert_eq!(self.inner.exchange.lock().unwrap().available.len(), 0);

            // NOTE: it is _necessary_ that we set this _before_ we call .wake
            // otherwise, the following may happen to the DisconnectPool future:
            //
            //  - We wake all in .wake
            //  - DisconnectPool::poll adds to .wake
            //  - DisconnectPool::poll reads .closed == false
            //  - We set .closed = true
            //
            // At this point, DisconnectPool::poll will never be notified again.
            self.inner.closed.store(true, Ordering::Release);
        }

        if self.inner.closed.load(Ordering::Acquire) {
            // `DisconnectPool` might still wait to be woken up.
            let mut exchange = self.inner.exchange.lock().unwrap();
            while let Some(w) = exchange.waiting.pop() {
                w.wake();
            }
            // we're about to exit, so there better be no outstanding connections
            assert_eq!(exchange.exist, 0);
            assert_eq!(exchange.available.len(), 0);
            drop(exchange);

            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

impl Drop for Recycler {
    fn drop(&mut self) {
        if !self.inner.closed.load(Ordering::Acquire) {
            // user did not wait for outstanding connections to finish!
            // this is not good -- we won't be able to shut down our connections cleanly
            // all we can do is try to ensure a clean shutdown
            self.inner.close.store(true, Ordering::SeqCst);
        }
    }
}