postgres/
notifications.rs

1//! Asynchronous notifications.
2
3use crate::connection::ConnectionRef;
4use crate::{Error, Notification};
5use fallible_iterator::FallibleIterator;
6use futures_util::{ready, FutureExt};
7use std::pin::Pin;
8use std::task::Poll;
9use std::time::Duration;
10use tokio::time::{self, Instant, Sleep};
11
12/// Notifications from a PostgreSQL backend.
13pub struct Notifications<'a> {
14    connection: ConnectionRef<'a>,
15}
16
17impl<'a> Notifications<'a> {
18    pub(crate) fn new(connection: ConnectionRef<'a>) -> Notifications<'a> {
19        Notifications { connection }
20    }
21
22    /// Returns the number of already buffered pending notifications.
23    pub fn len(&self) -> usize {
24        self.connection.notifications().len()
25    }
26
27    /// Determines if there are any already buffered pending notifications.
28    pub fn is_empty(&self) -> bool {
29        self.connection.notifications().is_empty()
30    }
31
32    /// Returns a nonblocking iterator over notifications.
33    ///
34    /// If there are no already buffered pending notifications, this iterator will poll the connection but will not
35    /// block waiting on notifications over the network. A return value of `None` either indicates that there are no
36    /// pending notifications or that the server has disconnected.
37    ///
38    /// # Note
39    ///
40    /// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
41    pub fn iter(&mut self) -> Iter<'_> {
42        Iter {
43            connection: self.connection.as_ref(),
44        }
45    }
46
47    /// Returns a blocking iterator over notifications.
48    ///
49    /// If there are no already buffered pending notifications, this iterator will block indefinitely waiting on the
50    /// PostgreSQL backend server to send one. It will only return `None` if the server has disconnected.
51    pub fn blocking_iter(&mut self) -> BlockingIter<'_> {
52        BlockingIter {
53            connection: self.connection.as_ref(),
54        }
55    }
56
57    /// Returns an iterator over notifications which blocks a limited amount of time.
58    ///
59    /// If there are no already buffered pending notifications, this iterator will block waiting on the PostgreSQL
60    /// backend server to send one up to the provided timeout. A return value of `None` either indicates that there are
61    /// no pending notifications or that the server has disconnected.
62    ///
63    /// # Note
64    ///
65    /// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
66    pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
67        TimeoutIter {
68            delay: Box::pin(self.connection.enter(|| time::sleep(timeout))),
69            timeout,
70            connection: self.connection.as_ref(),
71        }
72    }
73}
74
75/// A nonblocking iterator over pending notifications.
76pub struct Iter<'a> {
77    connection: ConnectionRef<'a>,
78}
79
80impl<'a> FallibleIterator for Iter<'a> {
81    type Item = Notification;
82    type Error = Error;
83
84    fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
85        if let Some(notification) = self.connection.notifications_mut().pop_front() {
86            return Ok(Some(notification));
87        }
88
89        self.connection
90            .poll_block_on(|_, notifications, _| Poll::Ready(Ok(notifications.pop_front())))
91    }
92
93    fn size_hint(&self) -> (usize, Option<usize>) {
94        (self.connection.notifications().len(), None)
95    }
96}
97
98/// A blocking iterator over pending notifications.
99pub struct BlockingIter<'a> {
100    connection: ConnectionRef<'a>,
101}
102
103impl<'a> FallibleIterator for BlockingIter<'a> {
104    type Item = Notification;
105    type Error = Error;
106
107    fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
108        if let Some(notification) = self.connection.notifications_mut().pop_front() {
109            return Ok(Some(notification));
110        }
111
112        self.connection
113            .poll_block_on(|_, notifications, done| match notifications.pop_front() {
114                Some(notification) => Poll::Ready(Ok(Some(notification))),
115                None if done => Poll::Ready(Ok(None)),
116                None => Poll::Pending,
117            })
118    }
119
120    fn size_hint(&self) -> (usize, Option<usize>) {
121        (self.connection.notifications().len(), None)
122    }
123}
124
125/// A time-limited blocking iterator over pending notifications.
126pub struct TimeoutIter<'a> {
127    connection: ConnectionRef<'a>,
128    delay: Pin<Box<Sleep>>,
129    timeout: Duration,
130}
131
132impl<'a> FallibleIterator for TimeoutIter<'a> {
133    type Item = Notification;
134    type Error = Error;
135
136    fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
137        if let Some(notification) = self.connection.notifications_mut().pop_front() {
138            self.delay.as_mut().reset(Instant::now() + self.timeout);
139            return Ok(Some(notification));
140        }
141
142        let delay = &mut self.delay;
143        let timeout = self.timeout;
144        self.connection.poll_block_on(|cx, notifications, done| {
145            match notifications.pop_front() {
146                Some(notification) => {
147                    delay.as_mut().reset(Instant::now() + timeout);
148                    return Poll::Ready(Ok(Some(notification)));
149                }
150                None if done => return Poll::Ready(Ok(None)),
151                None => {}
152            }
153
154            ready!(delay.poll_unpin(cx));
155            Poll::Ready(Ok(None))
156        })
157    }
158
159    fn size_hint(&self) -> (usize, Option<usize>) {
160        (self.connection.notifications().len(), None)
161    }
162}