mysql_async/conn/pool/futures/
get_conn.rs

1// Copyright (c) 2016 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{
10    fmt,
11    future::Future,
12    pin::Pin,
13    task::{Context, Poll},
14};
15
16use futures_core::ready;
17#[cfg(feature = "tracing")]
18use {
19    std::sync::Arc,
20    tracing::{debug_span, Span},
21};
22
23use crate::{
24    conn::{
25        pool::{Pool, QueueId},
26        Conn,
27    },
28    error::*,
29};
30
31/// States of the GetConn future.
32pub(crate) enum GetConnInner {
33    New,
34    Done,
35    // TODO: one day this should be an existential
36    Connecting(crate::BoxFuture<'static, Conn>),
37    /// This future will check, that idling connection is alive.
38    Checking(crate::BoxFuture<'static, Conn>),
39}
40
41impl fmt::Debug for GetConnInner {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            GetConnInner::New => f.debug_tuple("GetConnInner::New").finish(),
45            GetConnInner::Done => f.debug_tuple("GetConnInner::Done").finish(),
46            GetConnInner::Connecting(_) => f
47                .debug_tuple("GetConnInner::Connecting")
48                .field(&"<future>")
49                .finish(),
50            GetConnInner::Checking(_) => f
51                .debug_tuple("GetConnInner::Checking")
52                .field(&"<future>")
53                .finish(),
54        }
55    }
56}
57
58/// This future will take connection from a pool and resolve to [`Conn`].
59#[derive(Debug)]
60#[must_use = "futures do nothing unless you `.await` or poll them"]
61pub struct GetConn {
62    pub(crate) queue_id: QueueId,
63    pub(crate) pool: Option<Pool>,
64    pub(crate) inner: GetConnInner,
65    reset_upon_returning_to_a_pool: bool,
66    #[cfg(feature = "tracing")]
67    span: Arc<Span>,
68}
69
70impl GetConn {
71    pub(crate) fn new(pool: &Pool, reset_upon_returning_to_a_pool: bool) -> GetConn {
72        GetConn {
73            queue_id: QueueId::next(),
74            pool: Some(pool.clone()),
75            inner: GetConnInner::New,
76            reset_upon_returning_to_a_pool,
77            #[cfg(feature = "tracing")]
78            span: Arc::new(debug_span!("mysql_async::get_conn")),
79        }
80    }
81
82    fn pool_mut(&mut self) -> &mut Pool {
83        self.pool
84            .as_mut()
85            .expect("GetConn::poll polled after returning Async::Ready")
86    }
87
88    fn pool_take(&mut self) -> Pool {
89        self.pool
90            .take()
91            .expect("GetConn::poll polled after returning Async::Ready")
92    }
93}
94
95// this manual implementation of Future may seem stupid, but we sort
96// of need it to get the dropping behavior we want.
97impl Future for GetConn {
98    type Output = Result<Conn>;
99
100    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
101        #[cfg(feature = "tracing")]
102        let span = self.span.clone();
103        #[cfg(feature = "tracing")]
104        let _span_guard = span.enter();
105        loop {
106            match self.inner {
107                GetConnInner::New => {
108                    let queue_id = self.queue_id;
109                    let next = ready!(self.pool_mut().poll_new_conn(cx, queue_id))?;
110                    match next {
111                        GetConnInner::Connecting(conn_fut) => {
112                            self.inner = GetConnInner::Connecting(conn_fut);
113                        }
114                        GetConnInner::Checking(conn_fut) => {
115                            self.inner = GetConnInner::Checking(conn_fut);
116                        }
117                        GetConnInner::Done => unreachable!(
118                            "Pool::poll_new_conn never gives out already-consumed GetConns"
119                        ),
120                        GetConnInner::New => {
121                            unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
122                        }
123                    }
124                }
125                GetConnInner::Done => {
126                    unreachable!("GetConn::poll polled after returning Async::Ready");
127                }
128                GetConnInner::Connecting(ref mut f) => {
129                    let result = ready!(Pin::new(f).poll(cx));
130                    let pool = self.pool_take();
131
132                    self.inner = GetConnInner::Done;
133
134                    return match result {
135                        Ok(mut c) => {
136                            c.inner.pool = Some(pool);
137                            c.inner.reset_upon_returning_to_a_pool =
138                                self.reset_upon_returning_to_a_pool;
139                            Poll::Ready(Ok(c))
140                        }
141                        Err(e) => {
142                            pool.cancel_connection();
143                            Poll::Ready(Err(e))
144                        }
145                    };
146                }
147                GetConnInner::Checking(ref mut f) => {
148                    let result = ready!(Pin::new(f).poll(cx));
149                    match result {
150                        Ok(mut c) => {
151                            self.inner = GetConnInner::Done;
152
153                            let pool = self.pool_take();
154                            c.inner.pool = Some(pool);
155                            c.inner.reset_upon_returning_to_a_pool =
156                                self.reset_upon_returning_to_a_pool;
157                            return Poll::Ready(Ok(c));
158                        }
159                        Err(_) => {
160                            // Idling connection is broken. We'll drop it and try again.
161                            self.inner = GetConnInner::New;
162
163                            let pool = self.pool_mut();
164                            pool.cancel_connection();
165                            continue;
166                        }
167                    }
168                }
169            }
170        }
171    }
172}
173
174impl Drop for GetConn {
175    fn drop(&mut self) {
176        // We drop a connection before it can be resolved, a.k.a. cancelling it.
177        // Make sure we maintain the necessary invariants towards the pool.
178        if let Some(pool) = self.pool.take() {
179            // Remove the waker from the pool's waitlist in case this task was
180            // woken by another waker, like from tokio::time::timeout.
181            pool.unqueue(self.queue_id);
182            if let GetConnInner::Connecting(..) = self.inner {
183                pool.cancel_connection();
184            }
185        }
186    }
187}