h2/proto/streams/
counts.rs

1use super::*;
2
3use std::usize;
4
5#[derive(Debug)]
6pub(super) struct Counts {
7    /// Acting as a client or server. This allows us to track which values to
8    /// inc / dec.
9    peer: peer::Dyn,
10
11    /// Maximum number of locally initiated streams
12    max_send_streams: usize,
13
14    /// Current number of remote initiated streams
15    num_send_streams: usize,
16
17    /// Maximum number of remote initiated streams
18    max_recv_streams: usize,
19
20    /// Current number of locally initiated streams
21    num_recv_streams: usize,
22
23    /// Maximum number of pending locally reset streams
24    max_local_reset_streams: usize,
25
26    /// Current number of pending locally reset streams
27    num_local_reset_streams: usize,
28
29    /// Max number of "pending accept" streams that were remotely reset
30    max_remote_reset_streams: usize,
31
32    /// Current number of "pending accept" streams that were remotely reset
33    num_remote_reset_streams: usize,
34
35    /// Maximum number of locally reset streams due to protocol error across
36    /// the lifetime of the connection.
37    ///
38    /// When this gets exceeded, we issue GOAWAYs.
39    max_local_error_reset_streams: Option<usize>,
40
41    /// Total number of locally reset streams due to protocol error across the
42    /// lifetime of the connection.
43    num_local_error_reset_streams: usize,
44}
45
46impl Counts {
47    /// Create a new `Counts` using the provided configuration values.
48    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
49        Counts {
50            peer,
51            max_send_streams: config.initial_max_send_streams,
52            num_send_streams: 0,
53            max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
54            num_recv_streams: 0,
55            max_local_reset_streams: config.local_reset_max,
56            num_local_reset_streams: 0,
57            max_remote_reset_streams: config.remote_reset_max,
58            num_remote_reset_streams: 0,
59            max_local_error_reset_streams: config.local_max_error_reset_streams,
60            num_local_error_reset_streams: 0,
61        }
62    }
63
64    /// Returns true when the next opened stream will reach capacity of outbound streams
65    ///
66    /// The number of client send streams is incremented in prioritize; send_request has to guess if
67    /// it should wait before allowing another request to be sent.
68    pub fn next_send_stream_will_reach_capacity(&self) -> bool {
69        self.max_send_streams <= (self.num_send_streams + 1)
70    }
71
72    /// Returns the current peer
73    pub fn peer(&self) -> peer::Dyn {
74        self.peer
75    }
76
77    pub fn has_streams(&self) -> bool {
78        self.num_send_streams != 0 || self.num_recv_streams != 0
79    }
80
81    /// Returns true if we can issue another local reset due to protocol error.
82    pub fn can_inc_num_local_error_resets(&self) -> bool {
83        if let Some(max) = self.max_local_error_reset_streams {
84            max > self.num_local_error_reset_streams
85        } else {
86            true
87        }
88    }
89
90    pub fn inc_num_local_error_resets(&mut self) {
91        assert!(self.can_inc_num_local_error_resets());
92
93        // Increment the number of remote initiated streams
94        self.num_local_error_reset_streams += 1;
95    }
96
97    pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
98        self.max_local_error_reset_streams
99    }
100
101    /// Returns true if the receive stream concurrency can be incremented
102    pub fn can_inc_num_recv_streams(&self) -> bool {
103        self.max_recv_streams > self.num_recv_streams
104    }
105
106    /// Increments the number of concurrent receive streams.
107    ///
108    /// # Panics
109    ///
110    /// Panics on failure as this should have been validated before hand.
111    pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
112        assert!(self.can_inc_num_recv_streams());
113        assert!(!stream.is_counted);
114
115        // Increment the number of remote initiated streams
116        self.num_recv_streams += 1;
117        stream.is_counted = true;
118    }
119
120    /// Returns true if the send stream concurrency can be incremented
121    pub fn can_inc_num_send_streams(&self) -> bool {
122        self.max_send_streams > self.num_send_streams
123    }
124
125    /// Increments the number of concurrent send streams.
126    ///
127    /// # Panics
128    ///
129    /// Panics on failure as this should have been validated before hand.
130    pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
131        assert!(self.can_inc_num_send_streams());
132        assert!(!stream.is_counted);
133
134        // Increment the number of remote initiated streams
135        self.num_send_streams += 1;
136        stream.is_counted = true;
137    }
138
139    /// Returns true if the number of pending reset streams can be incremented.
140    pub fn can_inc_num_reset_streams(&self) -> bool {
141        self.max_local_reset_streams > self.num_local_reset_streams
142    }
143
144    /// Increments the number of pending reset streams.
145    ///
146    /// # Panics
147    ///
148    /// Panics on failure as this should have been validated before hand.
149    pub fn inc_num_reset_streams(&mut self) {
150        assert!(self.can_inc_num_reset_streams());
151
152        self.num_local_reset_streams += 1;
153    }
154
155    pub(crate) fn max_remote_reset_streams(&self) -> usize {
156        self.max_remote_reset_streams
157    }
158
159    /// Returns true if the number of pending REMOTE reset streams can be
160    /// incremented.
161    pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
162        self.max_remote_reset_streams > self.num_remote_reset_streams
163    }
164
165    /// Increments the number of pending REMOTE reset streams.
166    ///
167    /// # Panics
168    ///
169    /// Panics on failure as this should have been validated before hand.
170    pub(crate) fn inc_num_remote_reset_streams(&mut self) {
171        assert!(self.can_inc_num_remote_reset_streams());
172
173        self.num_remote_reset_streams += 1;
174    }
175
176    pub(crate) fn dec_num_remote_reset_streams(&mut self) {
177        assert!(self.num_remote_reset_streams > 0);
178
179        self.num_remote_reset_streams -= 1;
180    }
181
182    pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
183        match settings.max_concurrent_streams() {
184            Some(val) => self.max_send_streams = val as usize,
185            None if is_initial => self.max_send_streams = usize::MAX,
186            None => {}
187        }
188    }
189
190    /// Run a block of code that could potentially transition a stream's state.
191    ///
192    /// If the stream state transitions to closed, this function will perform
193    /// all necessary cleanup.
194    ///
195    /// TODO: Is this function still needed?
196    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
197    where
198        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
199    {
200        // TODO: Does this need to be computed before performing the action?
201        let is_pending_reset = stream.is_pending_reset_expiration();
202
203        // Run the action
204        let ret = f(self, &mut stream);
205
206        self.transition_after(stream, is_pending_reset);
207
208        ret
209    }
210
211    // TODO: move this to macro?
212    pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
213        tracing::trace!(
214            "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
215             pending_send_empty={:?}; buffered_send_data={}; \
216             num_recv={}; num_send={}",
217            stream.id,
218            stream.state,
219            stream.is_closed(),
220            stream.pending_send.is_empty(),
221            stream.buffered_send_data,
222            self.num_recv_streams,
223            self.num_send_streams
224        );
225
226        if stream.is_closed() {
227            if !stream.is_pending_reset_expiration() {
228                stream.unlink();
229                if is_reset_counted {
230                    self.dec_num_reset_streams();
231                }
232            }
233
234            if stream.is_counted {
235                tracing::trace!("dec_num_streams; stream={:?}", stream.id);
236                // Decrement the number of active streams.
237                self.dec_num_streams(&mut stream);
238            }
239        }
240
241        // Release the stream if it requires releasing
242        if stream.is_released() {
243            stream.remove();
244        }
245    }
246
247    /// Returns the maximum number of streams that can be initiated by this
248    /// peer.
249    pub(crate) fn max_send_streams(&self) -> usize {
250        self.max_send_streams
251    }
252
253    /// Returns the maximum number of streams that can be initiated by the
254    /// remote peer.
255    pub(crate) fn max_recv_streams(&self) -> usize {
256        self.max_recv_streams
257    }
258
259    fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
260        assert!(stream.is_counted);
261
262        if self.peer.is_local_init(stream.id) {
263            assert!(self.num_send_streams > 0);
264            self.num_send_streams -= 1;
265            stream.is_counted = false;
266        } else {
267            assert!(self.num_recv_streams > 0);
268            self.num_recv_streams -= 1;
269            stream.is_counted = false;
270        }
271    }
272
273    fn dec_num_reset_streams(&mut self) {
274        assert!(self.num_local_reset_streams > 0);
275        self.num_local_reset_streams -= 1;
276    }
277}
278
279impl Drop for Counts {
280    fn drop(&mut self) {
281        use std::thread;
282
283        if !thread::panicking() {
284            debug_assert!(!self.has_streams());
285        }
286    }
287}