1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
// Copyright (c) 2016 Anatoly Ikorsky
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_core::ready;
use tokio::sync::mpsc::UnboundedSender;
use crate::{
conn::pool::{Inner, Pool, QUEUE_END_ID},
error::Error,
Conn,
};
use std::sync::{atomic, Arc};
/// Future that disconnects this pool from a server and resolves to `()`.
///
/// **Note:** This Future won't resolve until all active connections, taken from it,
/// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct DisconnectPool {
pool_inner: Arc<Inner>,
drop: Option<UnboundedSender<Option<Conn>>>,
}
impl DisconnectPool {
pub(crate) fn new(pool: Pool) -> Self {
Self {
pool_inner: pool.inner,
drop: Some(pool.drop),
}
}
}
impl Future for DisconnectPool {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.pool_inner.close.store(true, atomic::Ordering::Release);
let mut exchange = self.pool_inner.exchange.lock().unwrap();
exchange.spawn_futures_if_needed(&self.pool_inner);
exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID);
drop(exchange);
if self.pool_inner.closed.load(atomic::Ordering::Acquire) {
Poll::Ready(Ok(()))
} else {
match self.drop.take() {
Some(drop) => match drop.send(None) {
Ok(_) => {
// Recycler is alive. Waiting for it to finish.
Poll::Ready(Ok(ready!(Box::pin(drop.closed()).as_mut().poll(cx))))
}
Err(_) => {
// Recycler seem dead. No one will wake us.
Poll::Ready(Ok(()))
}
},
None => Poll::Pending,
}
}
}
}