mysql_async/conn/pool/futures/
disconnect_pool.rs1use std::{
10 future::Future,
11 pin::Pin,
12 task::{Context, Poll},
13};
14
15use futures_core::ready;
16use tokio::sync::mpsc::UnboundedSender;
17
18use crate::{
19 conn::pool::{Inner, Pool, QUEUE_END_ID},
20 error::Error,
21 Conn,
22};
23
24use std::sync::{atomic, Arc};
25
26#[derive(Debug)]
31#[must_use = "futures do nothing unless you `.await` or poll them"]
32pub struct DisconnectPool {
33 pool_inner: Arc<Inner>,
34 drop: Option<UnboundedSender<Option<Conn>>>,
35}
36
37impl DisconnectPool {
38 pub(crate) fn new(pool: Pool) -> Self {
39 Self {
40 pool_inner: pool.inner,
41 drop: Some(pool.drop),
42 }
43 }
44}
45
46impl Future for DisconnectPool {
47 type Output = Result<(), Error>;
48
49 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50 self.pool_inner.close.store(true, atomic::Ordering::Release);
51 let mut exchange = self.pool_inner.exchange.lock().unwrap();
52 exchange.spawn_futures_if_needed(&self.pool_inner);
53 exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID);
54 drop(exchange);
55
56 if self.pool_inner.closed.load(atomic::Ordering::Acquire) {
57 Poll::Ready(Ok(()))
58 } else {
59 match self.drop.take() {
60 Some(drop) => match drop.send(None) {
61 Ok(_) => {
62 ready!(Box::pin(drop.closed()).as_mut().poll(cx));
64 Poll::Ready(Ok(()))
65 }
66 Err(_) => {
67 Poll::Ready(Ok(()))
69 }
70 },
71 None => Poll::Pending,
72 }
73 }
74 }
75}