mysql_async/conn/pool/futures/
disconnect_pool.rs

1// Copyright (c) 2016 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{
10    future::Future,
11    pin::Pin,
12    task::{Context, Poll},
13};
14
15use futures_core::ready;
16use tokio::sync::mpsc::UnboundedSender;
17
18use crate::{
19    conn::pool::{Inner, Pool, QUEUE_END_ID},
20    error::Error,
21    Conn,
22};
23
24use std::sync::{atomic, Arc};
25
26/// Future that disconnects this pool from a server and resolves to `()`.
27///
28/// **Note:** This Future won't resolve until all active connections, taken from it,
29/// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
30#[derive(Debug)]
31#[must_use = "futures do nothing unless you `.await` or poll them"]
32pub struct DisconnectPool {
33    pool_inner: Arc<Inner>,
34    drop: Option<UnboundedSender<Option<Conn>>>,
35}
36
37impl DisconnectPool {
38    pub(crate) fn new(pool: Pool) -> Self {
39        Self {
40            pool_inner: pool.inner,
41            drop: Some(pool.drop),
42        }
43    }
44}
45
46impl Future for DisconnectPool {
47    type Output = Result<(), Error>;
48
49    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50        self.pool_inner.close.store(true, atomic::Ordering::Release);
51        let mut exchange = self.pool_inner.exchange.lock().unwrap();
52        exchange.spawn_futures_if_needed(&self.pool_inner);
53        exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID);
54        drop(exchange);
55
56        if self.pool_inner.closed.load(atomic::Ordering::Acquire) {
57            Poll::Ready(Ok(()))
58        } else {
59            match self.drop.take() {
60                Some(drop) => match drop.send(None) {
61                    Ok(_) => {
62                        // Recycler is alive. Waiting for it to finish.
63                        ready!(Box::pin(drop.closed()).as_mut().poll(cx));
64                        Poll::Ready(Ok(()))
65                    }
66                    Err(_) => {
67                        // Recycler seem dead. No one will wake us.
68                        Poll::Ready(Ok(()))
69                    }
70                },
71                None => Poll::Pending,
72            }
73        }
74    }
75}