kube_runtime/controller/
future_hash_map.rs

1use futures::{Future, FutureExt, Stream};
2use std::{
3    collections::HashMap,
4    hash::Hash,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9/// Variant of [`tokio_stream::StreamMap`](https://docs.rs/tokio-stream/0.1.3/tokio_stream/struct.StreamMap.html)
10/// that only runs [`Future`]s, and uses a [`HashMap`] as the backing store, giving (amortized) O(1) insertion
11/// and membership checks.
12///
13/// Just like for `StreamMap`'s `S`, `F` must be [`Unpin`], since [`HashMap`] is free to move
14/// entries as it pleases (for example: resizing the backing array).
15///
16/// NOTE: Contrary to `StreamMap`, `FutureHashMap` does *not* try to be fair. The polling order
17/// is arbitrary, but generally stable while the future set is (although this should not be relied on!).
18#[derive(Debug)]
19pub struct FutureHashMap<K, F> {
20    futures: HashMap<K, F>,
21}
22
23impl<K, F> Default for FutureHashMap<K, F> {
24    fn default() -> Self {
25        Self {
26            futures: HashMap::new(),
27        }
28    }
29}
30
31impl<K, F> FutureHashMap<K, F>
32where
33    K: Hash + Eq,
34{
35    /// Inserts `future` into the key `key`, returning the old future if there was one
36    pub fn insert(&mut self, key: K, future: F) -> Option<F> {
37        self.futures.insert(key, future)
38    }
39
40    pub fn contains_key(&self, key: &K) -> bool {
41        self.futures.contains_key(key)
42    }
43
44    pub fn len(&self) -> usize {
45        self.futures.len()
46    }
47}
48
49impl<K, F> Stream for FutureHashMap<K, F>
50where
51    K: Hash + Clone + Eq,
52    F: Future + Unpin,
53    Self: Unpin,
54{
55    type Item = F::Output;
56
57    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58        let key_and_msg =
59            self.as_mut()
60                .futures
61                .iter_mut()
62                .find_map(|(key, future)| match future.poll_unpin(cx) {
63                    Poll::Ready(msg) => Some((key.clone(), msg)),
64                    Poll::Pending => None,
65                });
66        //dbg!((key_and_msg.is_some(), &self.futures.len()));
67        match key_and_msg {
68            Some((key, msg)) => {
69                self.as_mut().futures.remove(&key);
70                Poll::Ready(Some(msg))
71            }
72            None if self.futures.is_empty() => Poll::Ready(None),
73            None => Poll::Pending,
74        }
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use std::{future, task::Poll};
81
82    use super::FutureHashMap;
83    use futures::{channel::mpsc, poll, StreamExt};
84
85    #[tokio::test]
86    async fn fhm_should_forward_all_values_and_shut_down() {
87        let mut fhm = FutureHashMap::default();
88        let count = 100;
89        for i in 0..count {
90            fhm.insert(i, future::ready(i));
91        }
92        let mut values = fhm.collect::<Vec<u16>>().await;
93        values.sort_unstable();
94        assert_eq!(values, (0..count).collect::<Vec<u16>>());
95    }
96
97    #[tokio::test]
98    async fn fhm_should_stay_alive_until_all_sources_finish() {
99        let mut fhm = FutureHashMap::default();
100        let (tx0, mut rx0) = mpsc::unbounded::<()>();
101        let (tx1, mut rx1) = mpsc::unbounded::<()>();
102        fhm.insert(0, rx0.next());
103        fhm.insert(1, rx1.next());
104        assert_eq!(poll!(fhm.next()), Poll::Pending);
105        drop(tx0);
106        assert_eq!(poll!(fhm.next()), Poll::Ready(Some(None)));
107        assert_eq!(poll!(fhm.next()), Poll::Pending);
108        drop(tx1);
109        assert_eq!(poll!(fhm.next()), Poll::Ready(Some(None)));
110        assert_eq!(poll!(fhm.next()), Poll::Ready(None));
111    }
112}