kube_runtime/controller/
future_hash_map.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use futures::{Future, FutureExt, Stream};
use std::{
    collections::HashMap,
    hash::Hash,
    pin::Pin,
    task::{Context, Poll},
};

/// Variant of [`tokio_stream::StreamMap`](https://docs.rs/tokio-stream/0.1.3/tokio_stream/struct.StreamMap.html)
/// that only runs [`Future`]s, and uses a [`HashMap`] as the backing store, giving (amortized) O(1) insertion
/// and membership checks.
///
/// Just like for `StreamMap`'s `S`, `F` must be [`Unpin`], since [`HashMap`] is free to move
/// entries as it pleases (for example: resizing the backing array).
///
/// NOTE: Contrary to `StreamMap`, `FutureHashMap` does *not* try to be fair. The polling order
/// is arbitrary, but generally stable while the future set is (although this should not be relied on!).
#[derive(Debug)]
pub struct FutureHashMap<K, F> {
    futures: HashMap<K, F>,
}

impl<K, F> Default for FutureHashMap<K, F> {
    fn default() -> Self {
        Self {
            futures: HashMap::new(),
        }
    }
}

impl<K, F> FutureHashMap<K, F>
where
    K: Hash + Eq,
{
    /// Inserts `future` into the key `key`, returning the old future if there was one
    pub fn insert(&mut self, key: K, future: F) -> Option<F> {
        self.futures.insert(key, future)
    }

    pub fn contains_key(&self, key: &K) -> bool {
        self.futures.contains_key(key)
    }

    pub fn len(&self) -> usize {
        self.futures.len()
    }
}

impl<K, F> Stream for FutureHashMap<K, F>
where
    K: Hash + Clone + Eq,
    F: Future + Unpin,
    Self: Unpin,
{
    type Item = F::Output;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let key_and_msg =
            self.as_mut()
                .futures
                .iter_mut()
                .find_map(|(key, future)| match future.poll_unpin(cx) {
                    Poll::Ready(msg) => Some((key.clone(), msg)),
                    Poll::Pending => None,
                });
        //dbg!((key_and_msg.is_some(), &self.futures.len()));
        match key_and_msg {
            Some((key, msg)) => {
                self.as_mut().futures.remove(&key);
                Poll::Ready(Some(msg))
            }
            None if self.futures.is_empty() => Poll::Ready(None),
            None => Poll::Pending,
        }
    }
}

#[cfg(test)]
mod tests {
    use std::{future, task::Poll};

    use super::FutureHashMap;
    use futures::{channel::mpsc, poll, StreamExt};

    #[tokio::test]
    async fn fhm_should_forward_all_values_and_shut_down() {
        let mut fhm = FutureHashMap::default();
        let count = 100;
        for i in 0..count {
            fhm.insert(i, future::ready(i));
        }
        let mut values = fhm.collect::<Vec<u16>>().await;
        values.sort_unstable();
        assert_eq!(values, (0..count).collect::<Vec<u16>>());
    }

    #[tokio::test]
    async fn fhm_should_stay_alive_until_all_sources_finish() {
        let mut fhm = FutureHashMap::default();
        let (tx0, mut rx0) = mpsc::unbounded::<()>();
        let (tx1, mut rx1) = mpsc::unbounded::<()>();
        fhm.insert(0, rx0.next());
        fhm.insert(1, rx1.next());
        assert_eq!(poll!(fhm.next()), Poll::Pending);
        drop(tx0);
        assert_eq!(poll!(fhm.next()), Poll::Ready(Some(None)));
        assert_eq!(poll!(fhm.next()), Poll::Pending);
        drop(tx1);
        assert_eq!(poll!(fhm.next()), Poll::Ready(Some(None)));
        assert_eq!(poll!(fhm.next()), Poll::Ready(None));
    }
}