mysql_async/conn/pool/recycler.rs
1// Copyright (c) 2019 mysql_async contributors
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_core::stream::Stream;
10use futures_util::{stream::futures_unordered::FuturesUnordered, FutureExt};
11use tokio::sync::mpsc;
12
13use std::{
14 future::Future,
15 pin::Pin,
16 sync::{atomic::Ordering, Arc},
17 task::{Context, Poll},
18};
19
20use super::{IdlingConn, Inner};
21use crate::{queryable::transaction::TxStatus, BoxFuture, Conn, PoolOpts};
22use tokio::sync::mpsc::UnboundedReceiver;
23
24#[derive(Debug)]
25#[must_use = "futures do nothing unless you `.await` or poll them"]
26pub(crate) struct Recycler {
27 inner: Arc<Inner>,
28 discard: FuturesUnordered<BoxFuture<'static, ()>>,
29 discarded: usize,
30 cleaning: FuturesUnordered<BoxFuture<'static, Conn>>,
31 reset: FuturesUnordered<BoxFuture<'static, Conn>>,
32
33 // Option<Conn> so that we have a way to send a "I didn't make a Conn after all" signal
34 dropped: mpsc::UnboundedReceiver<Option<Conn>>,
35 /// Pool options.
36 pool_opts: PoolOpts,
37 eof: bool,
38}
39
40impl Recycler {
41 pub fn new(
42 pool_opts: PoolOpts,
43 inner: Arc<Inner>,
44 dropped: UnboundedReceiver<Option<Conn>>,
45 ) -> Self {
46 Self {
47 inner,
48 discard: FuturesUnordered::new(),
49 discarded: 0,
50 cleaning: FuturesUnordered::new(),
51 reset: FuturesUnordered::new(),
52 dropped,
53 pool_opts,
54 eof: false,
55 }
56 }
57}
58
59impl Future for Recycler {
60 type Output = ();
61
62 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63 let mut close = self.inner.close.load(Ordering::Acquire);
64
65 macro_rules! conn_return {
66 ($self:ident, $conn:ident, $pool_is_closed: expr) => {{
67 let mut exchange = $self.inner.exchange.lock().unwrap();
68 if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() {
69 drop(exchange);
70 $self
71 .inner
72 .metrics
73 .discarded_superfluous_connection
74 .fetch_add(1, Ordering::Relaxed);
75 $self.discard.push($conn.close_conn().boxed());
76 } else {
77 $self
78 .inner
79 .metrics
80 .connection_returned_to_pool
81 .fetch_add(1, Ordering::Relaxed);
82 $self
83 .inner
84 .metrics
85 .connections_in_pool
86 .fetch_add(1, Ordering::Relaxed);
87 #[cfg(feature = "hdrhistogram")]
88 $self
89 .inner
90 .metrics
91 .connection_active_duration
92 .lock()
93 .unwrap()
94 .saturating_record($conn.inner.active_since.elapsed().as_micros() as u64);
95 exchange.available.push_back($conn.into());
96 if let Some(w) = exchange.waiting.pop() {
97 w.wake();
98 }
99 }
100 }};
101 }
102
103 macro_rules! conn_decision {
104 ($self:ident, $conn:ident) => {
105 if $conn.inner.stream.is_none() || $conn.inner.disconnected {
106 // drop unestablished connection
107 $self
108 .inner
109 .metrics
110 .discarded_unestablished_connection
111 .fetch_add(1, Ordering::Relaxed);
112 $self.discard.push(futures_util::future::ok(()).boxed());
113 } else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() {
114 $self
115 .inner
116 .metrics
117 .dirty_connection_return
118 .fetch_add(1, Ordering::Relaxed);
119 $self.cleaning.push($conn.cleanup_for_pool().boxed());
120 } else if $conn.expired() || close {
121 $self
122 .inner
123 .metrics
124 .discarded_expired_connection
125 .fetch_add(1, Ordering::Relaxed);
126 $self.discard.push($conn.close_conn().boxed());
127 } else if $conn.inner.reset_upon_returning_to_a_pool {
128 $self
129 .inner
130 .metrics
131 .resetting_connection
132 .fetch_add(1, Ordering::Relaxed);
133 $self.reset.push($conn.reset_for_pool().boxed());
134 } else {
135 conn_return!($self, $conn, false);
136 }
137 };
138 }
139
140 while !self.eof {
141 // see if there are more connections for us to recycle
142 match Pin::new(&mut self.dropped).poll_recv(cx) {
143 Poll::Ready(Some(Some(conn))) => {
144 assert!(conn.inner.pool.is_none());
145 conn_decision!(self, conn);
146 }
147 Poll::Ready(Some(None)) => {
148 // someone signaled us that it's exit time
149 close = self.inner.close.load(Ordering::Acquire);
150 assert!(close);
151 continue;
152 }
153 Poll::Ready(None) => {
154 // no more connections are coming -- time to exit!
155 self.inner.close.store(true, Ordering::Release);
156 self.eof = true;
157 close = true;
158 }
159 Poll::Pending => {
160 // nope -- but let's still make progress on the ones we have
161 break;
162 }
163 }
164 }
165
166 // if we've been asked to close, reclaim any idle connections
167 if close || self.eof {
168 while let Some(IdlingConn { conn, .. }) =
169 self.inner.exchange.lock().unwrap().available.pop_front()
170 {
171 assert!(conn.inner.pool.is_none());
172 conn_decision!(self, conn);
173 }
174 }
175
176 // are any dirty connections ready for us to reclaim?
177 loop {
178 match Pin::new(&mut self.cleaning).poll_next(cx) {
179 Poll::Pending | Poll::Ready(None) => break,
180 Poll::Ready(Some(Ok(conn))) => conn_decision!(self, conn),
181 Poll::Ready(Some(Err(e))) => {
182 // an error occurred while cleaning a connection.
183 // what do we do? replace it with a new connection?
184 // for a conn to end up in cleaning, it must have come through .dropped.
185 // anything that comes through .dropped we know has .pool.is_none().
186 // therefore, dropping the conn won't decrement .exist, so we need to do that.
187 self.discarded += 1;
188 self.inner
189 .metrics
190 .discarded_error_during_cleanup
191 .fetch_add(1, Ordering::Relaxed);
192 // NOTE: we're discarding the error here
193 let _ = e;
194 }
195 }
196 }
197
198 // let's iterate through connections being successfully reset
199 loop {
200 match Pin::new(&mut self.reset).poll_next(cx) {
201 Poll::Pending | Poll::Ready(None) => break,
202 Poll::Ready(Some(Ok(conn))) => conn_return!(self, conn, close),
203 Poll::Ready(Some(Err(e))) => {
204 // an error during reset.
205 // replace with a new connection
206 self.discarded += 1;
207 self.inner
208 .metrics
209 .discarded_error_during_cleanup
210 .fetch_add(1, Ordering::Relaxed);
211 // NOTE: we're discarding the error here
212 let _ = e;
213 }
214 }
215 }
216
217 // are there any torn-down connections for us to deal with?
218 loop {
219 match Pin::new(&mut self.discard).poll_next(cx) {
220 Poll::Pending | Poll::Ready(None) => break,
221 Poll::Ready(Some(Ok(()))) => {
222 // yes! count it.
223 // note that we must decrement .exist since the connection does not have a
224 // .pool, and therefore won't do anything useful when it is dropped.
225 self.discarded += 1
226 }
227 Poll::Ready(Some(Err(e))) => {
228 // an error occurred while closing a connection.
229 // what do we do? we still replace it with a new connection..
230 self.discarded += 1;
231 self.inner
232 .metrics
233 .discarded_error_during_cleanup
234 .fetch_add(1, Ordering::Relaxed);
235 // NOTE: we're discarding the error here
236 let _ = e;
237 }
238 }
239 }
240
241 if self.discarded != 0 {
242 self.inner
243 .metrics
244 .connection_count
245 .fetch_sub(self.discarded, Ordering::Relaxed);
246
247 // we need to open up slots for new connctions to be established!
248 let mut exchange = self.inner.exchange.lock().unwrap();
249 exchange.exist -= self.discarded;
250 for _ in 0..self.discarded {
251 if let Some(w) = exchange.waiting.pop() {
252 w.wake();
253 }
254 }
255 drop(exchange);
256 self.discarded = 0;
257 }
258
259 // NOTE: we are asserting here that no more connections will ever be returned to
260 // us. see the explanation in Pool::poll_new_conn for why this is okay, even during
261 // races on .exist
262 let effectively_eof = close && self.inner.exchange.lock().unwrap().exist == 0;
263
264 if (self.eof || effectively_eof)
265 && self.cleaning.is_empty()
266 && self.discard.is_empty()
267 && self.reset.is_empty()
268 {
269 // we know that all Pool handles have been dropped (self.dropped.poll returned None).
270
271 // if this assertion fails, where are the remaining connections?
272 assert_eq!(self.inner.exchange.lock().unwrap().available.len(), 0);
273
274 // NOTE: it is _necessary_ that we set this _before_ we call .wake
275 // otherwise, the following may happen to the DisconnectPool future:
276 //
277 // - We wake all in .wake
278 // - DisconnectPool::poll adds to .wake
279 // - DisconnectPool::poll reads .closed == false
280 // - We set .closed = true
281 //
282 // At this point, DisconnectPool::poll will never be notified again.
283 self.inner.closed.store(true, Ordering::Release);
284 }
285
286 if self.inner.closed.load(Ordering::Acquire) {
287 // `DisconnectPool` might still wait to be woken up.
288 let mut exchange = self.inner.exchange.lock().unwrap();
289 while let Some(w) = exchange.waiting.pop() {
290 w.wake();
291 }
292 // we're about to exit, so there better be no outstanding connections
293 assert_eq!(exchange.exist, 0);
294 assert_eq!(exchange.available.len(), 0);
295 drop(exchange);
296
297 Poll::Ready(())
298 } else {
299 Poll::Pending
300 }
301 }
302}
303
304impl Drop for Recycler {
305 fn drop(&mut self) {
306 if !self.inner.closed.load(Ordering::Acquire) {
307 // user did not wait for outstanding connections to finish!
308 // this is not good -- we won't be able to shut down our connections cleanly
309 // all we can do is try to ensure a clean shutdown
310 self.inner.close.store(true, Ordering::SeqCst);
311 }
312 }
313}