mysql_async/conn/pool/futures/
get_conn.rs
1use std::{
10 fmt,
11 future::Future,
12 pin::Pin,
13 task::{Context, Poll},
14};
15
16use futures_core::ready;
17#[cfg(feature = "tracing")]
18use {
19 std::sync::Arc,
20 tracing::{debug_span, Span},
21};
22
23use crate::{
24 conn::{
25 pool::{Pool, QueueId},
26 Conn,
27 },
28 error::*,
29};
30
31pub(crate) enum GetConnInner {
33 New,
34 Done,
35 Connecting(crate::BoxFuture<'static, Conn>),
37 Checking(crate::BoxFuture<'static, Conn>),
39}
40
41impl fmt::Debug for GetConnInner {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 GetConnInner::New => f.debug_tuple("GetConnInner::New").finish(),
45 GetConnInner::Done => f.debug_tuple("GetConnInner::Done").finish(),
46 GetConnInner::Connecting(_) => f
47 .debug_tuple("GetConnInner::Connecting")
48 .field(&"<future>")
49 .finish(),
50 GetConnInner::Checking(_) => f
51 .debug_tuple("GetConnInner::Checking")
52 .field(&"<future>")
53 .finish(),
54 }
55 }
56}
57
58#[derive(Debug)]
60#[must_use = "futures do nothing unless you `.await` or poll them"]
61pub struct GetConn {
62 pub(crate) queue_id: QueueId,
63 pub(crate) pool: Option<Pool>,
64 pub(crate) inner: GetConnInner,
65 reset_upon_returning_to_a_pool: bool,
66 #[cfg(feature = "tracing")]
67 span: Arc<Span>,
68}
69
70impl GetConn {
71 pub(crate) fn new(pool: &Pool, reset_upon_returning_to_a_pool: bool) -> GetConn {
72 GetConn {
73 queue_id: QueueId::next(),
74 pool: Some(pool.clone()),
75 inner: GetConnInner::New,
76 reset_upon_returning_to_a_pool,
77 #[cfg(feature = "tracing")]
78 span: Arc::new(debug_span!("mysql_async::get_conn")),
79 }
80 }
81
82 fn pool_mut(&mut self) -> &mut Pool {
83 self.pool
84 .as_mut()
85 .expect("GetConn::poll polled after returning Async::Ready")
86 }
87
88 fn pool_take(&mut self) -> Pool {
89 self.pool
90 .take()
91 .expect("GetConn::poll polled after returning Async::Ready")
92 }
93}
94
95impl Future for GetConn {
98 type Output = Result<Conn>;
99
100 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
101 #[cfg(feature = "tracing")]
102 let span = self.span.clone();
103 #[cfg(feature = "tracing")]
104 let _span_guard = span.enter();
105 loop {
106 match self.inner {
107 GetConnInner::New => {
108 let queue_id = self.queue_id;
109 let next = ready!(self.pool_mut().poll_new_conn(cx, queue_id))?;
110 match next {
111 GetConnInner::Connecting(conn_fut) => {
112 self.inner = GetConnInner::Connecting(conn_fut);
113 }
114 GetConnInner::Checking(conn_fut) => {
115 self.inner = GetConnInner::Checking(conn_fut);
116 }
117 GetConnInner::Done => unreachable!(
118 "Pool::poll_new_conn never gives out already-consumed GetConns"
119 ),
120 GetConnInner::New => {
121 unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
122 }
123 }
124 }
125 GetConnInner::Done => {
126 unreachable!("GetConn::poll polled after returning Async::Ready");
127 }
128 GetConnInner::Connecting(ref mut f) => {
129 let result = ready!(Pin::new(f).poll(cx));
130 let pool = self.pool_take();
131
132 self.inner = GetConnInner::Done;
133
134 return match result {
135 Ok(mut c) => {
136 c.inner.pool = Some(pool);
137 c.inner.reset_upon_returning_to_a_pool =
138 self.reset_upon_returning_to_a_pool;
139 Poll::Ready(Ok(c))
140 }
141 Err(e) => {
142 pool.cancel_connection();
143 Poll::Ready(Err(e))
144 }
145 };
146 }
147 GetConnInner::Checking(ref mut f) => {
148 let result = ready!(Pin::new(f).poll(cx));
149 match result {
150 Ok(mut c) => {
151 self.inner = GetConnInner::Done;
152
153 let pool = self.pool_take();
154 c.inner.pool = Some(pool);
155 c.inner.reset_upon_returning_to_a_pool =
156 self.reset_upon_returning_to_a_pool;
157 return Poll::Ready(Ok(c));
158 }
159 Err(_) => {
160 self.inner = GetConnInner::New;
162
163 let pool = self.pool_mut();
164 pool.cancel_connection();
165 continue;
166 }
167 }
168 }
169 }
170 }
171 }
172}
173
174impl Drop for GetConn {
175 fn drop(&mut self) {
176 if let Some(pool) = self.pool.take() {
179 pool.unqueue(self.queue_id);
182 if let GetConnInner::Connecting(..) = self.inner {
183 pool.cancel_connection();
184 }
185 }
186 }
187}