1use super::*;
23use std::usize;
45#[derive(Debug)]
6pub(super) struct Counts {
7/// Acting as a client or server. This allows us to track which values to
8 /// inc / dec.
9peer: peer::Dyn,
1011/// Maximum number of locally initiated streams
12max_send_streams: usize,
1314/// Current number of remote initiated streams
15num_send_streams: usize,
1617/// Maximum number of remote initiated streams
18max_recv_streams: usize,
1920/// Current number of locally initiated streams
21num_recv_streams: usize,
2223/// Maximum number of pending locally reset streams
24max_local_reset_streams: usize,
2526/// Current number of pending locally reset streams
27num_local_reset_streams: usize,
2829/// Max number of "pending accept" streams that were remotely reset
30max_remote_reset_streams: usize,
3132/// Current number of "pending accept" streams that were remotely reset
33num_remote_reset_streams: usize,
3435/// Maximum number of locally reset streams due to protocol error across
36 /// the lifetime of the connection.
37 ///
38 /// When this gets exceeded, we issue GOAWAYs.
39max_local_error_reset_streams: Option<usize>,
4041/// Total number of locally reset streams due to protocol error across the
42 /// lifetime of the connection.
43num_local_error_reset_streams: usize,
44}
4546impl Counts {
47/// Create a new `Counts` using the provided configuration values.
48pub fn new(peer: peer::Dyn, config: &Config) -> Self {
49 Counts {
50 peer,
51 max_send_streams: config.initial_max_send_streams,
52 num_send_streams: 0,
53 max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
54 num_recv_streams: 0,
55 max_local_reset_streams: config.local_reset_max,
56 num_local_reset_streams: 0,
57 max_remote_reset_streams: config.remote_reset_max,
58 num_remote_reset_streams: 0,
59 max_local_error_reset_streams: config.local_max_error_reset_streams,
60 num_local_error_reset_streams: 0,
61 }
62 }
6364/// Returns true when the next opened stream will reach capacity of outbound streams
65 ///
66 /// The number of client send streams is incremented in prioritize; send_request has to guess if
67 /// it should wait before allowing another request to be sent.
68pub fn next_send_stream_will_reach_capacity(&self) -> bool {
69self.max_send_streams <= (self.num_send_streams + 1)
70 }
7172/// Returns the current peer
73pub fn peer(&self) -> peer::Dyn {
74self.peer
75 }
7677pub fn has_streams(&self) -> bool {
78self.num_send_streams != 0 || self.num_recv_streams != 0
79}
8081/// Returns true if we can issue another local reset due to protocol error.
82pub fn can_inc_num_local_error_resets(&self) -> bool {
83if let Some(max) = self.max_local_error_reset_streams {
84 max > self.num_local_error_reset_streams
85 } else {
86true
87}
88 }
8990pub fn inc_num_local_error_resets(&mut self) {
91assert!(self.can_inc_num_local_error_resets());
9293// Increment the number of remote initiated streams
94self.num_local_error_reset_streams += 1;
95 }
9697pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
98self.max_local_error_reset_streams
99 }
100101/// Returns true if the receive stream concurrency can be incremented
102pub fn can_inc_num_recv_streams(&self) -> bool {
103self.max_recv_streams > self.num_recv_streams
104 }
105106/// Increments the number of concurrent receive streams.
107 ///
108 /// # Panics
109 ///
110 /// Panics on failure as this should have been validated before hand.
111pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
112assert!(self.can_inc_num_recv_streams());
113assert!(!stream.is_counted);
114115// Increment the number of remote initiated streams
116self.num_recv_streams += 1;
117 stream.is_counted = true;
118 }
119120/// Returns true if the send stream concurrency can be incremented
121pub fn can_inc_num_send_streams(&self) -> bool {
122self.max_send_streams > self.num_send_streams
123 }
124125/// Increments the number of concurrent send streams.
126 ///
127 /// # Panics
128 ///
129 /// Panics on failure as this should have been validated before hand.
130pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
131assert!(self.can_inc_num_send_streams());
132assert!(!stream.is_counted);
133134// Increment the number of remote initiated streams
135self.num_send_streams += 1;
136 stream.is_counted = true;
137 }
138139/// Returns true if the number of pending reset streams can be incremented.
140pub fn can_inc_num_reset_streams(&self) -> bool {
141self.max_local_reset_streams > self.num_local_reset_streams
142 }
143144/// Increments the number of pending reset streams.
145 ///
146 /// # Panics
147 ///
148 /// Panics on failure as this should have been validated before hand.
149pub fn inc_num_reset_streams(&mut self) {
150assert!(self.can_inc_num_reset_streams());
151152self.num_local_reset_streams += 1;
153 }
154155pub(crate) fn max_remote_reset_streams(&self) -> usize {
156self.max_remote_reset_streams
157 }
158159/// Returns true if the number of pending REMOTE reset streams can be
160 /// incremented.
161pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
162self.max_remote_reset_streams > self.num_remote_reset_streams
163 }
164165/// Increments the number of pending REMOTE reset streams.
166 ///
167 /// # Panics
168 ///
169 /// Panics on failure as this should have been validated before hand.
170pub(crate) fn inc_num_remote_reset_streams(&mut self) {
171assert!(self.can_inc_num_remote_reset_streams());
172173self.num_remote_reset_streams += 1;
174 }
175176pub(crate) fn dec_num_remote_reset_streams(&mut self) {
177assert!(self.num_remote_reset_streams > 0);
178179self.num_remote_reset_streams -= 1;
180 }
181182pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
183match settings.max_concurrent_streams() {
184Some(val) => self.max_send_streams = val as usize,
185None if is_initial => self.max_send_streams = usize::MAX,
186None => {}
187 }
188 }
189190/// Run a block of code that could potentially transition a stream's state.
191 ///
192 /// If the stream state transitions to closed, this function will perform
193 /// all necessary cleanup.
194 ///
195 /// TODO: Is this function still needed?
196pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
197where
198F: FnOnce(&mut Self, &mut store::Ptr) -> U,
199 {
200// TODO: Does this need to be computed before performing the action?
201let is_pending_reset = stream.is_pending_reset_expiration();
202203// Run the action
204let ret = f(self, &mut stream);
205206self.transition_after(stream, is_pending_reset);
207208 ret
209 }
210211// TODO: move this to macro?
212pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
213tracing::trace!(
214"transition_after; stream={:?}; state={:?}; is_closed={:?}; \
215 pending_send_empty={:?}; buffered_send_data={}; \
216 num_recv={}; num_send={}",
217 stream.id,
218 stream.state,
219 stream.is_closed(),
220 stream.pending_send.is_empty(),
221 stream.buffered_send_data,
222self.num_recv_streams,
223self.num_send_streams
224 );
225226if stream.is_closed() {
227if !stream.is_pending_reset_expiration() {
228 stream.unlink();
229if is_reset_counted {
230self.dec_num_reset_streams();
231 }
232 }
233234if stream.is_counted {
235tracing::trace!("dec_num_streams; stream={:?}", stream.id);
236// Decrement the number of active streams.
237self.dec_num_streams(&mut stream);
238 }
239 }
240241// Release the stream if it requires releasing
242if stream.is_released() {
243 stream.remove();
244 }
245 }
246247/// Returns the maximum number of streams that can be initiated by this
248 /// peer.
249pub(crate) fn max_send_streams(&self) -> usize {
250self.max_send_streams
251 }
252253/// Returns the maximum number of streams that can be initiated by the
254 /// remote peer.
255pub(crate) fn max_recv_streams(&self) -> usize {
256self.max_recv_streams
257 }
258259fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
260assert!(stream.is_counted);
261262if self.peer.is_local_init(stream.id) {
263assert!(self.num_send_streams > 0);
264self.num_send_streams -= 1;
265 stream.is_counted = false;
266 } else {
267assert!(self.num_recv_streams > 0);
268self.num_recv_streams -= 1;
269 stream.is_counted = false;
270 }
271 }
272273fn dec_num_reset_streams(&mut self) {
274assert!(self.num_local_reset_streams > 0);
275self.num_local_reset_streams -= 1;
276 }
277}
278279impl Drop for Counts {
280fn drop(&mut self) {
281use std::thread;
282283if !thread::panicking() {
284debug_assert!(!self.has_streams());
285 }
286 }
287}