kube_runtime/controller/
future_hash_map.rs
1use futures::{Future, FutureExt, Stream};
2use std::{
3 collections::HashMap,
4 hash::Hash,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9#[derive(Debug)]
19pub struct FutureHashMap<K, F> {
20 futures: HashMap<K, F>,
21}
22
23impl<K, F> Default for FutureHashMap<K, F> {
24 fn default() -> Self {
25 Self {
26 futures: HashMap::new(),
27 }
28 }
29}
30
31impl<K, F> FutureHashMap<K, F>
32where
33 K: Hash + Eq,
34{
35 pub fn insert(&mut self, key: K, future: F) -> Option<F> {
37 self.futures.insert(key, future)
38 }
39
40 pub fn contains_key(&self, key: &K) -> bool {
41 self.futures.contains_key(key)
42 }
43
44 pub fn len(&self) -> usize {
45 self.futures.len()
46 }
47}
48
49impl<K, F> Stream for FutureHashMap<K, F>
50where
51 K: Hash + Clone + Eq,
52 F: Future + Unpin,
53 Self: Unpin,
54{
55 type Item = F::Output;
56
57 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58 let key_and_msg =
59 self.as_mut()
60 .futures
61 .iter_mut()
62 .find_map(|(key, future)| match future.poll_unpin(cx) {
63 Poll::Ready(msg) => Some((key.clone(), msg)),
64 Poll::Pending => None,
65 });
66 match key_and_msg {
68 Some((key, msg)) => {
69 self.as_mut().futures.remove(&key);
70 Poll::Ready(Some(msg))
71 }
72 None if self.futures.is_empty() => Poll::Ready(None),
73 None => Poll::Pending,
74 }
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use std::{future, task::Poll};
81
82 use super::FutureHashMap;
83 use futures::{channel::mpsc, poll, StreamExt};
84
85 #[tokio::test]
86 async fn fhm_should_forward_all_values_and_shut_down() {
87 let mut fhm = FutureHashMap::default();
88 let count = 100;
89 for i in 0..count {
90 fhm.insert(i, future::ready(i));
91 }
92 let mut values = fhm.collect::<Vec<u16>>().await;
93 values.sort_unstable();
94 assert_eq!(values, (0..count).collect::<Vec<u16>>());
95 }
96
97 #[tokio::test]
98 async fn fhm_should_stay_alive_until_all_sources_finish() {
99 let mut fhm = FutureHashMap::default();
100 let (tx0, mut rx0) = mpsc::unbounded::<()>();
101 let (tx1, mut rx1) = mpsc::unbounded::<()>();
102 fhm.insert(0, rx0.next());
103 fhm.insert(1, rx1.next());
104 assert_eq!(poll!(fhm.next()), Poll::Pending);
105 drop(tx0);
106 assert_eq!(poll!(fhm.next()), Poll::Ready(Some(None)));
107 assert_eq!(poll!(fhm.next()), Poll::Pending);
108 drop(tx1);
109 assert_eq!(poll!(fhm.next()), Poll::Ready(Some(None)));
110 assert_eq!(poll!(fhm.next()), Poll::Ready(None));
111 }
112}