azure_core/
pageable.rs
1use futures::stream::unfold;
2use futures::Stream;
3
4macro_rules! r#try {
7 ($expr:expr $(,)?) => {
8 match $expr {
9 Result::Ok(val) => val,
10 Result::Err(err) => {
11 return Some((Err(err.into()), State::Done));
12 }
13 }
14 };
15}
16
17macro_rules! declare {
20 ($($extra:tt)*) => {
21 mod pageable {
26 #![allow(dead_code)]
27 use super::*;
28 #[pin_project::pin_project]
33 pub struct Pageable<T, E> {
35 #[pin]
36 pub(crate) stream: std::pin::Pin<Box<dyn Stream<Item = Result<T, E>> $($extra)*>>,
37 }
38 }
39 pub use pageable::Pageable;
40
41 impl<T, E> Pageable<T, E>
42 where
43 T: Continuable,
44 {
45 pub fn new<F>(
46 make_request: impl Fn(Option<T::Continuation>) -> F + Clone $($extra)* + 'static,
47 ) -> Self
48 where
49 F: std::future::Future<Output = Result<T, E>> $($extra)* + 'static,
50 {
51 let stream = unfold(State::Init, move |state: State<T::Continuation>| {
52 let make_request = make_request.clone();
53 async move {
54 let response = match state {
55 State::Init => {
56 let request = make_request(None);
57 r#try!(request.await)
58 }
59 State::Continuation(token) => {
60 let request = make_request(Some(token));
61 r#try!(request.await)
62 }
63 State::Done => {
64 return None;
65 }
66 };
67
68 let next_state = response
69 .continuation()
70 .map_or(State::Done, State::Continuation);
71
72 Some((Ok(response), next_state))
73 }
74 });
75 Self {
76 stream: Box::pin(stream),
77 }
78 }
79 }
80
81 pub trait Continuable {
83 type Continuation: 'static $($extra)*;
84 fn continuation(&self) -> Option<Self::Continuation>;
85 }
86 };
87}
88
89#[cfg(not(target_arch = "wasm32"))]
90declare!(+ Send);
91#[cfg(target_arch = "wasm32")]
92declare!();
93
94impl<T, E> Stream for Pageable<T, E> {
95 type Item = Result<T, E>;
96
97 fn poll_next(
98 self: std::pin::Pin<&mut Self>,
99 cx: &mut std::task::Context<'_>,
100 ) -> std::task::Poll<Option<Self::Item>> {
101 let this = self.project();
102 this.stream.poll_next(cx)
103 }
104}
105
106impl<T, O> std::fmt::Debug for Pageable<T, O> {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("Pageable").finish_non_exhaustive()
109 }
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113enum State<T> {
114 Init,
115 Continuation(T),
116 Done,
117}