azure_core/
pageable.rs

1use futures::stream::unfold;
2use futures::Stream;
3
4/// Helper macro for unwrapping `Result`s into the right types
5/// that `futures::stream::unfold` expects.
6macro_rules! r#try {
7    ($expr:expr $(,)?) => {
8        match $expr {
9            Result::Ok(val) => val,
10            Result::Err(err) => {
11                return Some((Err(err.into()), State::Done));
12            }
13        }
14    };
15}
16
17/// Helper macro for declaring the `Pageable` and `Continuable` types which easily allows
18/// for conditionally compiling with a `Send` constraint or not.
19macro_rules! declare {
20    ($($extra:tt)*) => {
21        // The use of a module here is a hack to get around the fact that `pin_project`
22        // generates a method `project_ref` which is never used and generates a warning.
23        // The module allows us to declare that `dead_code` is allowed but only for
24        // the `Pageable` type.
25        mod pageable {
26            #![allow(dead_code)]
27            use super::*;
28            /// A pageable stream that yields items of type `T`
29            ///
30            /// Internally uses the Azure specific continuation header to
31            /// make repeated requests to Azure yielding a new page each time.
32            #[pin_project::pin_project]
33            // This is to surpress the unused `project_ref` warning
34            pub struct Pageable<T, E> {
35                #[pin]
36                pub(crate) stream: std::pin::Pin<Box<dyn Stream<Item = Result<T, E>> $($extra)*>>,
37            }
38        }
39        pub use pageable::Pageable;
40
41        impl<T, E> Pageable<T, E>
42        where
43            T: Continuable,
44        {
45            pub fn new<F>(
46                make_request: impl Fn(Option<T::Continuation>) -> F + Clone $($extra)* + 'static,
47            ) -> Self
48            where
49                F: std::future::Future<Output = Result<T, E>> $($extra)* + 'static,
50            {
51                let stream = unfold(State::Init, move |state: State<T::Continuation>| {
52                    let make_request = make_request.clone();
53                    async move {
54                        let response = match state {
55                            State::Init => {
56                                let request = make_request(None);
57                                r#try!(request.await)
58                            }
59                            State::Continuation(token) => {
60                                let request = make_request(Some(token));
61                                r#try!(request.await)
62                            }
63                            State::Done => {
64                                return None;
65                            }
66                        };
67
68                        let next_state = response
69                            .continuation()
70                            .map_or(State::Done, State::Continuation);
71
72                        Some((Ok(response), next_state))
73                    }
74                });
75                Self {
76                    stream: Box::pin(stream),
77                }
78            }
79        }
80
81        /// A type that can yield an optional continuation token
82        pub trait Continuable {
83            type Continuation: 'static $($extra)*;
84            fn continuation(&self) -> Option<Self::Continuation>;
85        }
86    };
87}
88
89#[cfg(not(target_arch = "wasm32"))]
90declare!(+ Send);
91#[cfg(target_arch = "wasm32")]
92declare!();
93
94impl<T, E> Stream for Pageable<T, E> {
95    type Item = Result<T, E>;
96
97    fn poll_next(
98        self: std::pin::Pin<&mut Self>,
99        cx: &mut std::task::Context<'_>,
100    ) -> std::task::Poll<Option<Self::Item>> {
101        let this = self.project();
102        this.stream.poll_next(cx)
103    }
104}
105
106impl<T, O> std::fmt::Debug for Pageable<T, O> {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("Pageable").finish_non_exhaustive()
109    }
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113enum State<T> {
114    Init,
115    Continuation(T),
116    Done,
117}