h2/proto/streams/
store.rs

1use super::*;
2
3use indexmap::{self, IndexMap};
4
5use std::convert::Infallible;
6use std::fmt;
7use std::marker::PhantomData;
8use std::ops;
9
10/// Storage for streams
11#[derive(Debug)]
12pub(super) struct Store {
13    slab: slab::Slab<Stream>,
14    ids: IndexMap<StreamId, SlabIndex>,
15}
16
17/// "Pointer" to an entry in the store
18pub(super) struct Ptr<'a> {
19    key: Key,
20    store: &'a mut Store,
21}
22
23/// References an entry in the store.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub(crate) struct Key {
26    index: SlabIndex,
27    /// Keep the stream ID in the key as an ABA guard, since slab indices
28    /// could be re-used with a new stream.
29    stream_id: StreamId,
30}
31
32// We can never have more than `StreamId::MAX` streams in the store,
33// so we can save a smaller index (u32 vs usize).
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35struct SlabIndex(u32);
36
37#[derive(Debug)]
38pub(super) struct Queue<N> {
39    indices: Option<store::Indices>,
40    _p: PhantomData<N>,
41}
42
43pub(super) trait Next {
44    fn next(stream: &Stream) -> Option<Key>;
45
46    fn set_next(stream: &mut Stream, key: Option<Key>);
47
48    fn take_next(stream: &mut Stream) -> Option<Key>;
49
50    fn is_queued(stream: &Stream) -> bool;
51
52    fn set_queued(stream: &mut Stream, val: bool);
53}
54
55/// A linked list
56#[derive(Debug, Clone, Copy)]
57struct Indices {
58    pub head: Key,
59    pub tail: Key,
60}
61
62pub(super) enum Entry<'a> {
63    Occupied(OccupiedEntry<'a>),
64    Vacant(VacantEntry<'a>),
65}
66
67pub(super) struct OccupiedEntry<'a> {
68    ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
69}
70
71pub(super) struct VacantEntry<'a> {
72    ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
73    slab: &'a mut slab::Slab<Stream>,
74}
75
76pub(super) trait Resolve {
77    fn resolve(&mut self, key: Key) -> Ptr;
78}
79
80// ===== impl Store =====
81
82impl Store {
83    pub fn new() -> Self {
84        Store {
85            slab: slab::Slab::new(),
86            ids: IndexMap::new(),
87        }
88    }
89
90    pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
91        let index = match self.ids.get(id) {
92            Some(key) => *key,
93            None => return None,
94        };
95
96        Some(Ptr {
97            key: Key {
98                index,
99                stream_id: *id,
100            },
101            store: self,
102        })
103    }
104
105    pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
106        let index = SlabIndex(self.slab.insert(val) as u32);
107        assert!(self.ids.insert(id, index).is_none());
108
109        Ptr {
110            key: Key {
111                index,
112                stream_id: id,
113            },
114            store: self,
115        }
116    }
117
118    pub fn find_entry(&mut self, id: StreamId) -> Entry {
119        use self::indexmap::map::Entry::*;
120
121        match self.ids.entry(id) {
122            Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
123            Vacant(e) => Entry::Vacant(VacantEntry {
124                ids: e,
125                slab: &mut self.slab,
126            }),
127        }
128    }
129
130    #[allow(clippy::blocks_in_conditions)]
131    pub(crate) fn for_each<F>(&mut self, mut f: F)
132    where
133        F: FnMut(Ptr),
134    {
135        match self.try_for_each(|ptr| {
136            f(ptr);
137            Ok::<_, Infallible>(())
138        }) {
139            Ok(()) => (),
140            Err(infallible) => match infallible {},
141        }
142    }
143
144    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145    where
146        F: FnMut(Ptr) -> Result<(), E>,
147    {
148        let mut len = self.ids.len();
149        let mut i = 0;
150
151        while i < len {
152            // Get the key by index, this makes the borrow checker happy
153            let (stream_id, index) = {
154                let entry = self.ids.get_index(i).unwrap();
155                (*entry.0, *entry.1)
156            };
157
158            f(Ptr {
159                key: Key { index, stream_id },
160                store: self,
161            })?;
162
163            // TODO: This logic probably could be better...
164            let new_len = self.ids.len();
165
166            if new_len < len {
167                debug_assert!(new_len == len - 1);
168                len -= 1;
169            } else {
170                i += 1;
171            }
172        }
173
174        Ok(())
175    }
176}
177
178impl Resolve for Store {
179    fn resolve(&mut self, key: Key) -> Ptr {
180        Ptr { key, store: self }
181    }
182}
183
184impl ops::Index<Key> for Store {
185    type Output = Stream;
186
187    fn index(&self, key: Key) -> &Self::Output {
188        self.slab
189            .get(key.index.0 as usize)
190            .filter(|s| s.id == key.stream_id)
191            .unwrap_or_else(|| {
192                panic!("dangling store key for stream_id={:?}", key.stream_id);
193            })
194    }
195}
196
197impl ops::IndexMut<Key> for Store {
198    fn index_mut(&mut self, key: Key) -> &mut Self::Output {
199        self.slab
200            .get_mut(key.index.0 as usize)
201            .filter(|s| s.id == key.stream_id)
202            .unwrap_or_else(|| {
203                panic!("dangling store key for stream_id={:?}", key.stream_id);
204            })
205    }
206}
207
208impl Store {
209    #[cfg(feature = "unstable")]
210    pub fn num_active_streams(&self) -> usize {
211        self.ids.len()
212    }
213
214    #[cfg(feature = "unstable")]
215    pub fn num_wired_streams(&self) -> usize {
216        self.slab.len()
217    }
218}
219
220// While running h2 unit/integration tests, enable this debug assertion.
221//
222// In practice, we don't need to ensure this. But the integration tests
223// help to make sure we've cleaned up in cases where we could (like, the
224// runtime isn't suddenly dropping the task for unknown reasons).
225#[cfg(feature = "unstable")]
226impl Drop for Store {
227    fn drop(&mut self) {
228        use std::thread;
229
230        if !thread::panicking() {
231            debug_assert!(self.slab.is_empty());
232        }
233    }
234}
235
236// ===== impl Queue =====
237
238impl<N> Queue<N>
239where
240    N: Next,
241{
242    pub fn new() -> Self {
243        Queue {
244            indices: None,
245            _p: PhantomData,
246        }
247    }
248
249    pub fn take(&mut self) -> Self {
250        Queue {
251            indices: self.indices.take(),
252            _p: PhantomData,
253        }
254    }
255
256    /// Queue the stream.
257    ///
258    /// If the stream is already contained by the list, return `false`.
259    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260        tracing::trace!("Queue::push_back");
261
262        if N::is_queued(stream) {
263            tracing::trace!(" -> already queued");
264            return false;
265        }
266
267        N::set_queued(stream, true);
268
269        // The next pointer shouldn't be set
270        debug_assert!(N::next(stream).is_none());
271
272        // Queue the stream
273        match self.indices {
274            Some(ref mut idxs) => {
275                tracing::trace!(" -> existing entries");
276
277                // Update the current tail node to point to `stream`
278                let key = stream.key();
279                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281                // Update the tail pointer
282                idxs.tail = stream.key();
283            }
284            None => {
285                tracing::trace!(" -> first entry");
286                self.indices = Some(store::Indices {
287                    head: stream.key(),
288                    tail: stream.key(),
289                });
290            }
291        }
292
293        true
294    }
295
296    /// Queue the stream
297    ///
298    /// If the stream is already contained by the list, return `false`.
299    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300        tracing::trace!("Queue::push_front");
301
302        if N::is_queued(stream) {
303            tracing::trace!(" -> already queued");
304            return false;
305        }
306
307        N::set_queued(stream, true);
308
309        // The next pointer shouldn't be set
310        debug_assert!(N::next(stream).is_none());
311
312        // Queue the stream
313        match self.indices {
314            Some(ref mut idxs) => {
315                tracing::trace!(" -> existing entries");
316
317                // Update the provided stream to point to the head node
318                let head_key = stream.resolve(idxs.head).key();
319                N::set_next(stream, Some(head_key));
320
321                // Update the head pointer
322                idxs.head = stream.key();
323            }
324            None => {
325                tracing::trace!(" -> first entry");
326                self.indices = Some(store::Indices {
327                    head: stream.key(),
328                    tail: stream.key(),
329                });
330            }
331        }
332
333        true
334    }
335
336    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337    where
338        R: Resolve,
339    {
340        if let Some(mut idxs) = self.indices {
341            let mut stream = store.resolve(idxs.head);
342
343            if idxs.head == idxs.tail {
344                assert!(N::next(&stream).is_none());
345                self.indices = None;
346            } else {
347                idxs.head = N::take_next(&mut stream).unwrap();
348                self.indices = Some(idxs);
349            }
350
351            debug_assert!(N::is_queued(&stream));
352            N::set_queued(&mut stream, false);
353
354            return Some(stream);
355        }
356
357        None
358    }
359
360    pub fn is_empty(&self) -> bool {
361        self.indices.is_none()
362    }
363
364    pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
365    where
366        R: Resolve,
367        F: Fn(&Stream) -> bool,
368    {
369        if let Some(idxs) = self.indices {
370            let should_pop = f(&store.resolve(idxs.head));
371            if should_pop {
372                return self.pop(store);
373            }
374        }
375
376        None
377    }
378}
379
380// ===== impl Ptr =====
381
382impl<'a> Ptr<'a> {
383    /// Returns the Key associated with the stream
384    pub fn key(&self) -> Key {
385        self.key
386    }
387
388    pub fn store_mut(&mut self) -> &mut Store {
389        self.store
390    }
391
392    /// Remove the stream from the store
393    pub fn remove(self) -> StreamId {
394        // The stream must have been unlinked before this point
395        debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
396
397        // Remove the stream state
398        let stream = self.store.slab.remove(self.key.index.0 as usize);
399        assert_eq!(stream.id, self.key.stream_id);
400        stream.id
401    }
402
403    /// Remove the StreamId -> stream state association.
404    ///
405    /// This will effectively remove the stream as far as the H2 protocol is
406    /// concerned.
407    pub fn unlink(&mut self) {
408        let id = self.key.stream_id;
409        self.store.ids.swap_remove(&id);
410    }
411}
412
413impl<'a> Resolve for Ptr<'a> {
414    fn resolve(&mut self, key: Key) -> Ptr {
415        Ptr {
416            key,
417            store: &mut *self.store,
418        }
419    }
420}
421
422impl<'a> ops::Deref for Ptr<'a> {
423    type Target = Stream;
424
425    fn deref(&self) -> &Stream {
426        &self.store[self.key]
427    }
428}
429
430impl<'a> ops::DerefMut for Ptr<'a> {
431    fn deref_mut(&mut self) -> &mut Stream {
432        &mut self.store[self.key]
433    }
434}
435
436impl<'a> fmt::Debug for Ptr<'a> {
437    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
438        (**self).fmt(fmt)
439    }
440}
441
442// ===== impl OccupiedEntry =====
443
444impl<'a> OccupiedEntry<'a> {
445    pub fn key(&self) -> Key {
446        let stream_id = *self.ids.key();
447        let index = *self.ids.get();
448        Key { index, stream_id }
449    }
450}
451
452// ===== impl VacantEntry =====
453
454impl<'a> VacantEntry<'a> {
455    pub fn insert(self, value: Stream) -> Key {
456        // Insert the value in the slab
457        let stream_id = value.id;
458        let index = SlabIndex(self.slab.insert(value) as u32);
459
460        // Insert the handle in the ID map
461        self.ids.insert(index);
462
463        Key { index, stream_id }
464    }
465}