postgres/
notifications.rs
1use crate::connection::ConnectionRef;
4use crate::{Error, Notification};
5use fallible_iterator::FallibleIterator;
6use futures_util::{ready, FutureExt};
7use std::pin::Pin;
8use std::task::Poll;
9use std::time::Duration;
10use tokio::time::{self, Instant, Sleep};
11
12pub struct Notifications<'a> {
14 connection: ConnectionRef<'a>,
15}
16
17impl<'a> Notifications<'a> {
18 pub(crate) fn new(connection: ConnectionRef<'a>) -> Notifications<'a> {
19 Notifications { connection }
20 }
21
22 pub fn len(&self) -> usize {
24 self.connection.notifications().len()
25 }
26
27 pub fn is_empty(&self) -> bool {
29 self.connection.notifications().is_empty()
30 }
31
32 pub fn iter(&mut self) -> Iter<'_> {
42 Iter {
43 connection: self.connection.as_ref(),
44 }
45 }
46
47 pub fn blocking_iter(&mut self) -> BlockingIter<'_> {
52 BlockingIter {
53 connection: self.connection.as_ref(),
54 }
55 }
56
57 pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
67 TimeoutIter {
68 delay: Box::pin(self.connection.enter(|| time::sleep(timeout))),
69 timeout,
70 connection: self.connection.as_ref(),
71 }
72 }
73}
74
75pub struct Iter<'a> {
77 connection: ConnectionRef<'a>,
78}
79
80impl<'a> FallibleIterator for Iter<'a> {
81 type Item = Notification;
82 type Error = Error;
83
84 fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
85 if let Some(notification) = self.connection.notifications_mut().pop_front() {
86 return Ok(Some(notification));
87 }
88
89 self.connection
90 .poll_block_on(|_, notifications, _| Poll::Ready(Ok(notifications.pop_front())))
91 }
92
93 fn size_hint(&self) -> (usize, Option<usize>) {
94 (self.connection.notifications().len(), None)
95 }
96}
97
98pub struct BlockingIter<'a> {
100 connection: ConnectionRef<'a>,
101}
102
103impl<'a> FallibleIterator for BlockingIter<'a> {
104 type Item = Notification;
105 type Error = Error;
106
107 fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
108 if let Some(notification) = self.connection.notifications_mut().pop_front() {
109 return Ok(Some(notification));
110 }
111
112 self.connection
113 .poll_block_on(|_, notifications, done| match notifications.pop_front() {
114 Some(notification) => Poll::Ready(Ok(Some(notification))),
115 None if done => Poll::Ready(Ok(None)),
116 None => Poll::Pending,
117 })
118 }
119
120 fn size_hint(&self) -> (usize, Option<usize>) {
121 (self.connection.notifications().len(), None)
122 }
123}
124
125pub struct TimeoutIter<'a> {
127 connection: ConnectionRef<'a>,
128 delay: Pin<Box<Sleep>>,
129 timeout: Duration,
130}
131
132impl<'a> FallibleIterator for TimeoutIter<'a> {
133 type Item = Notification;
134 type Error = Error;
135
136 fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
137 if let Some(notification) = self.connection.notifications_mut().pop_front() {
138 self.delay.as_mut().reset(Instant::now() + self.timeout);
139 return Ok(Some(notification));
140 }
141
142 let delay = &mut self.delay;
143 let timeout = self.timeout;
144 self.connection.poll_block_on(|cx, notifications, done| {
145 match notifications.pop_front() {
146 Some(notification) => {
147 delay.as_mut().reset(Instant::now() + timeout);
148 return Poll::Ready(Ok(Some(notification)));
149 }
150 None if done => return Poll::Ready(Ok(None)),
151 None => {}
152 }
153
154 ready!(delay.poll_unpin(cx));
155 Poll::Ready(Ok(None))
156 })
157 }
158
159 fn size_hint(&self) -> (usize, Option<usize>) {
160 (self.connection.notifications().len(), None)
161 }
162}