use crate::connection::ConnectionRef;
use crate::{Error, Notification};
use fallible_iterator::FallibleIterator;
use futures_util::{ready, FutureExt};
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use tokio::time::{self, Instant, Sleep};
pub struct Notifications<'a> {
connection: ConnectionRef<'a>,
}
impl<'a> Notifications<'a> {
pub(crate) fn new(connection: ConnectionRef<'a>) -> Notifications<'a> {
Notifications { connection }
}
pub fn len(&self) -> usize {
self.connection.notifications().len()
}
pub fn is_empty(&self) -> bool {
self.connection.notifications().is_empty()
}
pub fn iter(&mut self) -> Iter<'_> {
Iter {
connection: self.connection.as_ref(),
}
}
pub fn blocking_iter(&mut self) -> BlockingIter<'_> {
BlockingIter {
connection: self.connection.as_ref(),
}
}
pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
TimeoutIter {
delay: Box::pin(self.connection.enter(|| time::sleep(timeout))),
timeout,
connection: self.connection.as_ref(),
}
}
}
pub struct Iter<'a> {
connection: ConnectionRef<'a>,
}
impl<'a> FallibleIterator for Iter<'a> {
type Item = Notification;
type Error = Error;
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
if let Some(notification) = self.connection.notifications_mut().pop_front() {
return Ok(Some(notification));
}
self.connection
.poll_block_on(|_, notifications, _| Poll::Ready(Ok(notifications.pop_front())))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.connection.notifications().len(), None)
}
}
pub struct BlockingIter<'a> {
connection: ConnectionRef<'a>,
}
impl<'a> FallibleIterator for BlockingIter<'a> {
type Item = Notification;
type Error = Error;
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
if let Some(notification) = self.connection.notifications_mut().pop_front() {
return Ok(Some(notification));
}
self.connection
.poll_block_on(|_, notifications, done| match notifications.pop_front() {
Some(notification) => Poll::Ready(Ok(Some(notification))),
None if done => Poll::Ready(Ok(None)),
None => Poll::Pending,
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.connection.notifications().len(), None)
}
}
pub struct TimeoutIter<'a> {
connection: ConnectionRef<'a>,
delay: Pin<Box<Sleep>>,
timeout: Duration,
}
impl<'a> FallibleIterator for TimeoutIter<'a> {
type Item = Notification;
type Error = Error;
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
if let Some(notification) = self.connection.notifications_mut().pop_front() {
self.delay.as_mut().reset(Instant::now() + self.timeout);
return Ok(Some(notification));
}
let delay = &mut self.delay;
let timeout = self.timeout;
self.connection.poll_block_on(|cx, notifications, done| {
match notifications.pop_front() {
Some(notification) => {
delay.as_mut().reset(Instant::now() + timeout);
return Poll::Ready(Ok(Some(notification)));
}
None if done => return Poll::Ready(Ok(None)),
None => {}
}
ready!(delay.poll_unpin(cx));
Poll::Ready(Ok(None))
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.connection.notifications().len(), None)
}
}