1use super::*;
2
3use std::task::{Context, Waker};
4use std::time::Instant;
5use std::usize;
6
7#[derive(Debug)]
19pub(super) struct Stream {
20 pub id: StreamId,
22
23 pub state: State,
25
26 pub is_counted: bool,
29
30 pub ref_count: usize,
32
33 pub next_pending_send: Option<store::Key>,
36
37 pub is_pending_send: bool,
39
40 pub send_flow: FlowControl,
42
43 pub requested_send_capacity: WindowSize,
45
46 pub buffered_send_data: usize,
49
50 send_task: Option<Waker>,
52
53 pub pending_send: buffer::Deque,
55
56 pub next_pending_send_capacity: Option<store::Key>,
59
60 pub is_pending_send_capacity: bool,
62
63 pub send_capacity_inc: bool,
65
66 pub next_open: Option<store::Key>,
68
69 pub is_pending_open: bool,
71
72 pub is_pending_push: bool,
74
75 pub next_pending_accept: Option<store::Key>,
78
79 pub is_pending_accept: bool,
81
82 pub recv_flow: FlowControl,
84
85 pub in_flight_recv_data: WindowSize,
86
87 pub next_window_update: Option<store::Key>,
89
90 pub is_pending_window_update: bool,
92
93 pub reset_at: Option<Instant>,
95
96 pub next_reset_expire: Option<store::Key>,
98
99 pub pending_recv: buffer::Deque,
101
102 pub is_recv: bool,
104
105 pub recv_task: Option<Waker>,
107
108 pub pending_push_promises: store::Queue<NextAccept>,
110
111 pub content_length: ContentLength,
113}
114
115#[derive(Debug)]
117pub enum ContentLength {
118 Omitted,
119 Head,
120 Remaining(u64),
121}
122
123#[derive(Debug)]
124pub(super) struct NextAccept;
125
126#[derive(Debug)]
127pub(super) struct NextSend;
128
129#[derive(Debug)]
130pub(super) struct NextSendCapacity;
131
132#[derive(Debug)]
133pub(super) struct NextWindowUpdate;
134
135#[derive(Debug)]
136pub(super) struct NextOpen;
137
138#[derive(Debug)]
139pub(super) struct NextResetExpire;
140
141impl Stream {
142 pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
143 let mut send_flow = FlowControl::new();
144 let mut recv_flow = FlowControl::new();
145
146 recv_flow
147 .inc_window(init_recv_window)
148 .expect("invalid initial receive window");
149 let _res = recv_flow.assign_capacity(init_recv_window);
151 debug_assert!(_res.is_ok());
152
153 send_flow
154 .inc_window(init_send_window)
155 .expect("invalid initial send window size");
156
157 Stream {
158 id,
159 state: State::default(),
160 ref_count: 0,
161 is_counted: false,
162
163 next_pending_send: None,
165 is_pending_send: false,
166 send_flow,
167 requested_send_capacity: 0,
168 buffered_send_data: 0,
169 send_task: None,
170 pending_send: buffer::Deque::new(),
171 is_pending_send_capacity: false,
172 next_pending_send_capacity: None,
173 send_capacity_inc: false,
174 is_pending_open: false,
175 next_open: None,
176 is_pending_push: false,
177
178 next_pending_accept: None,
180 is_pending_accept: false,
181 recv_flow,
182 in_flight_recv_data: 0,
183 next_window_update: None,
184 is_pending_window_update: false,
185 reset_at: None,
186 next_reset_expire: None,
187 pending_recv: buffer::Deque::new(),
188 is_recv: true,
189 recv_task: None,
190 pending_push_promises: store::Queue::new(),
191 content_length: ContentLength::Omitted,
192 }
193 }
194
195 pub fn ref_inc(&mut self) {
197 assert!(self.ref_count < usize::MAX);
198 self.ref_count += 1;
199 }
200
201 pub fn ref_dec(&mut self) {
203 assert!(self.ref_count > 0);
204 self.ref_count -= 1;
205 }
206
207 pub fn is_pending_reset_expiration(&self) -> bool {
210 self.reset_at.is_some()
211 }
212
213 pub fn is_send_ready(&self) -> bool {
215 !self.is_pending_open && !self.is_pending_push
231 }
232
233 pub fn is_closed(&self) -> bool {
235 self.state.is_closed() &&
237 self.pending_send.is_empty() &&
240 self.buffered_send_data == 0
246 }
247
248 pub fn is_released(&self) -> bool {
250 self.is_closed() &&
252 self.ref_count == 0 &&
254 !self.is_pending_send && !self.is_pending_send_capacity &&
256 !self.is_pending_accept && !self.is_pending_window_update &&
257 !self.is_pending_open && self.reset_at.is_none()
258 }
259
260 pub fn is_canceled_interest(&self) -> bool {
266 self.ref_count == 0 && !self.state.is_closed()
267 }
268
269 pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
271 let available = self.send_flow.available().as_size() as usize;
272 let buffered = self.buffered_send_data;
273
274 available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
275 }
276
277 pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
278 let prev_capacity = self.capacity(max_buffer_size);
279 debug_assert!(capacity > 0);
280 let _res = self.send_flow.assign_capacity(capacity);
282 debug_assert!(_res.is_ok());
283
284 tracing::trace!(
285 " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
286 self.send_flow.available(),
287 self.buffered_send_data,
288 self.id,
289 max_buffer_size,
290 prev_capacity,
291 );
292
293 if prev_capacity < self.capacity(max_buffer_size) {
294 self.notify_capacity();
295 }
296 }
297
298 pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
299 let prev_capacity = self.capacity(max_buffer_size);
300
301 let _res = self.send_flow.send_data(len);
303 debug_assert!(_res.is_ok());
304
305 debug_assert!(self.buffered_send_data >= len as usize);
307 self.buffered_send_data -= len as usize;
308 self.requested_send_capacity -= len;
309
310 tracing::trace!(
311 " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
312 self.send_flow.available(),
313 self.buffered_send_data,
314 self.id,
315 max_buffer_size,
316 prev_capacity,
317 );
318
319 if prev_capacity < self.capacity(max_buffer_size) {
320 self.notify_capacity();
321 }
322 }
323
324 pub fn notify_capacity(&mut self) {
327 self.send_capacity_inc = true;
328 tracing::trace!(" notifying task");
329 self.notify_send();
330 }
331
332 pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
334 match self.content_length {
335 ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
336 Some(val) => *rem = val,
337 None => return Err(()),
338 },
339 ContentLength::Head => {
340 if len != 0 {
341 return Err(());
342 }
343 }
344 _ => {}
345 }
346
347 Ok(())
348 }
349
350 pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
351 match self.content_length {
352 ContentLength::Remaining(0) => Ok(()),
353 ContentLength::Remaining(_) => Err(()),
354 _ => Ok(()),
355 }
356 }
357
358 pub fn notify_send(&mut self) {
359 if let Some(task) = self.send_task.take() {
360 task.wake();
361 }
362 }
363
364 pub fn wait_send(&mut self, cx: &Context) {
365 self.send_task = Some(cx.waker().clone());
366 }
367
368 pub fn notify_recv(&mut self) {
369 if let Some(task) = self.recv_task.take() {
370 task.wake();
371 }
372 }
373}
374
375impl store::Next for NextAccept {
376 fn next(stream: &Stream) -> Option<store::Key> {
377 stream.next_pending_accept
378 }
379
380 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
381 stream.next_pending_accept = key;
382 }
383
384 fn take_next(stream: &mut Stream) -> Option<store::Key> {
385 stream.next_pending_accept.take()
386 }
387
388 fn is_queued(stream: &Stream) -> bool {
389 stream.is_pending_accept
390 }
391
392 fn set_queued(stream: &mut Stream, val: bool) {
393 stream.is_pending_accept = val;
394 }
395}
396
397impl store::Next for NextSend {
398 fn next(stream: &Stream) -> Option<store::Key> {
399 stream.next_pending_send
400 }
401
402 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
403 stream.next_pending_send = key;
404 }
405
406 fn take_next(stream: &mut Stream) -> Option<store::Key> {
407 stream.next_pending_send.take()
408 }
409
410 fn is_queued(stream: &Stream) -> bool {
411 stream.is_pending_send
412 }
413
414 fn set_queued(stream: &mut Stream, val: bool) {
415 if val {
416 debug_assert!(!stream.is_pending_open);
419 }
420 stream.is_pending_send = val;
421 }
422}
423
424impl store::Next for NextSendCapacity {
425 fn next(stream: &Stream) -> Option<store::Key> {
426 stream.next_pending_send_capacity
427 }
428
429 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
430 stream.next_pending_send_capacity = key;
431 }
432
433 fn take_next(stream: &mut Stream) -> Option<store::Key> {
434 stream.next_pending_send_capacity.take()
435 }
436
437 fn is_queued(stream: &Stream) -> bool {
438 stream.is_pending_send_capacity
439 }
440
441 fn set_queued(stream: &mut Stream, val: bool) {
442 stream.is_pending_send_capacity = val;
443 }
444}
445
446impl store::Next for NextWindowUpdate {
447 fn next(stream: &Stream) -> Option<store::Key> {
448 stream.next_window_update
449 }
450
451 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
452 stream.next_window_update = key;
453 }
454
455 fn take_next(stream: &mut Stream) -> Option<store::Key> {
456 stream.next_window_update.take()
457 }
458
459 fn is_queued(stream: &Stream) -> bool {
460 stream.is_pending_window_update
461 }
462
463 fn set_queued(stream: &mut Stream, val: bool) {
464 stream.is_pending_window_update = val;
465 }
466}
467
468impl store::Next for NextOpen {
469 fn next(stream: &Stream) -> Option<store::Key> {
470 stream.next_open
471 }
472
473 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
474 stream.next_open = key;
475 }
476
477 fn take_next(stream: &mut Stream) -> Option<store::Key> {
478 stream.next_open.take()
479 }
480
481 fn is_queued(stream: &Stream) -> bool {
482 stream.is_pending_open
483 }
484
485 fn set_queued(stream: &mut Stream, val: bool) {
486 if val {
487 debug_assert!(!stream.is_pending_send);
490 }
491 stream.is_pending_open = val;
492 }
493}
494
495impl store::Next for NextResetExpire {
496 fn next(stream: &Stream) -> Option<store::Key> {
497 stream.next_reset_expire
498 }
499
500 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
501 stream.next_reset_expire = key;
502 }
503
504 fn take_next(stream: &mut Stream) -> Option<store::Key> {
505 stream.next_reset_expire.take()
506 }
507
508 fn is_queued(stream: &Stream) -> bool {
509 stream.reset_at.is_some()
510 }
511
512 fn set_queued(stream: &mut Stream, val: bool) {
513 if val {
514 stream.reset_at = Some(Instant::now());
515 } else {
516 stream.reset_at = None;
517 }
518 }
519}
520
521impl ContentLength {
524 pub fn is_head(&self) -> bool {
525 matches!(*self, Self::Head)
526 }
527}