h2/proto/streams/
stream.rs

1use super::*;
2
3use std::task::{Context, Waker};
4use std::time::Instant;
5use std::usize;
6
7/// Tracks Stream related state
8///
9/// # Reference counting
10///
11/// There can be a number of outstanding handles to a single Stream. These are
12/// tracked using reference counting. The `ref_count` field represents the
13/// number of outstanding userspace handles that can reach this stream.
14///
15/// It's important to note that when the stream is placed in an internal queue
16/// (such as an accept queue), this is **not** tracked by a reference count.
17/// Thus, `ref_count` can be zero and the stream still has to be kept around.
18#[derive(Debug)]
19pub(super) struct Stream {
20    /// The h2 stream identifier
21    pub id: StreamId,
22
23    /// Current state of the stream
24    pub state: State,
25
26    /// Set to `true` when the stream is counted against the connection's max
27    /// concurrent streams.
28    pub is_counted: bool,
29
30    /// Number of outstanding handles pointing to this stream
31    pub ref_count: usize,
32
33    // ===== Fields related to sending =====
34    /// Next node in the accept linked list
35    pub next_pending_send: Option<store::Key>,
36
37    /// Set to true when the stream is pending accept
38    pub is_pending_send: bool,
39
40    /// Send data flow control
41    pub send_flow: FlowControl,
42
43    /// Amount of send capacity that has been requested, but not yet allocated.
44    pub requested_send_capacity: WindowSize,
45
46    /// Amount of data buffered at the prioritization layer.
47    /// TODO: Technically this could be greater than the window size...
48    pub buffered_send_data: usize,
49
50    /// Task tracking additional send capacity (i.e. window updates).
51    send_task: Option<Waker>,
52
53    /// Frames pending for this stream being sent to the socket
54    pub pending_send: buffer::Deque,
55
56    /// Next node in the linked list of streams waiting for additional
57    /// connection level capacity.
58    pub next_pending_send_capacity: Option<store::Key>,
59
60    /// True if the stream is waiting for outbound connection capacity
61    pub is_pending_send_capacity: bool,
62
63    /// Set to true when the send capacity has been incremented
64    pub send_capacity_inc: bool,
65
66    /// Next node in the open linked list
67    pub next_open: Option<store::Key>,
68
69    /// Set to true when the stream is pending to be opened
70    pub is_pending_open: bool,
71
72    /// Set to true when a push is pending for this stream
73    pub is_pending_push: bool,
74
75    // ===== Fields related to receiving =====
76    /// Next node in the accept linked list
77    pub next_pending_accept: Option<store::Key>,
78
79    /// Set to true when the stream is pending accept
80    pub is_pending_accept: bool,
81
82    /// Receive data flow control
83    pub recv_flow: FlowControl,
84
85    pub in_flight_recv_data: WindowSize,
86
87    /// Next node in the linked list of streams waiting to send window updates.
88    pub next_window_update: Option<store::Key>,
89
90    /// True if the stream is waiting to send a window update
91    pub is_pending_window_update: bool,
92
93    /// The time when this stream may have been locally reset.
94    pub reset_at: Option<Instant>,
95
96    /// Next node in list of reset streams that should expire eventually
97    pub next_reset_expire: Option<store::Key>,
98
99    /// Frames pending for this stream to read
100    pub pending_recv: buffer::Deque,
101
102    /// When the RecvStream drop occurs, no data should be received.
103    pub is_recv: bool,
104
105    /// Task tracking receiving frames
106    pub recv_task: Option<Waker>,
107
108    /// The stream's pending push promises
109    pub pending_push_promises: store::Queue<NextAccept>,
110
111    /// Validate content-length headers
112    pub content_length: ContentLength,
113}
114
115/// State related to validating a stream's content-length
116#[derive(Debug)]
117pub enum ContentLength {
118    Omitted,
119    Head,
120    Remaining(u64),
121}
122
123#[derive(Debug)]
124pub(super) struct NextAccept;
125
126#[derive(Debug)]
127pub(super) struct NextSend;
128
129#[derive(Debug)]
130pub(super) struct NextSendCapacity;
131
132#[derive(Debug)]
133pub(super) struct NextWindowUpdate;
134
135#[derive(Debug)]
136pub(super) struct NextOpen;
137
138#[derive(Debug)]
139pub(super) struct NextResetExpire;
140
141impl Stream {
142    pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
143        let mut send_flow = FlowControl::new();
144        let mut recv_flow = FlowControl::new();
145
146        recv_flow
147            .inc_window(init_recv_window)
148            .expect("invalid initial receive window");
149        // TODO: proper error handling?
150        let _res = recv_flow.assign_capacity(init_recv_window);
151        debug_assert!(_res.is_ok());
152
153        send_flow
154            .inc_window(init_send_window)
155            .expect("invalid initial send window size");
156
157        Stream {
158            id,
159            state: State::default(),
160            ref_count: 0,
161            is_counted: false,
162
163            // ===== Fields related to sending =====
164            next_pending_send: None,
165            is_pending_send: false,
166            send_flow,
167            requested_send_capacity: 0,
168            buffered_send_data: 0,
169            send_task: None,
170            pending_send: buffer::Deque::new(),
171            is_pending_send_capacity: false,
172            next_pending_send_capacity: None,
173            send_capacity_inc: false,
174            is_pending_open: false,
175            next_open: None,
176            is_pending_push: false,
177
178            // ===== Fields related to receiving =====
179            next_pending_accept: None,
180            is_pending_accept: false,
181            recv_flow,
182            in_flight_recv_data: 0,
183            next_window_update: None,
184            is_pending_window_update: false,
185            reset_at: None,
186            next_reset_expire: None,
187            pending_recv: buffer::Deque::new(),
188            is_recv: true,
189            recv_task: None,
190            pending_push_promises: store::Queue::new(),
191            content_length: ContentLength::Omitted,
192        }
193    }
194
195    /// Increment the stream's ref count
196    pub fn ref_inc(&mut self) {
197        assert!(self.ref_count < usize::MAX);
198        self.ref_count += 1;
199    }
200
201    /// Decrements the stream's ref count
202    pub fn ref_dec(&mut self) {
203        assert!(self.ref_count > 0);
204        self.ref_count -= 1;
205    }
206
207    /// Returns true if stream is currently being held for some time because of
208    /// a local reset.
209    pub fn is_pending_reset_expiration(&self) -> bool {
210        self.reset_at.is_some()
211    }
212
213    /// Returns true if frames for this stream are ready to be sent over the wire
214    pub fn is_send_ready(&self) -> bool {
215        // Why do we check pending_open?
216        //
217        // We allow users to call send_request() which schedules a stream to be pending_open
218        // if there is no room according to the concurrency limit (max_send_streams), and we
219        // also allow data to be buffered for send with send_data() if there is no capacity for
220        // the stream to send the data, which attempts to place the stream in pending_send.
221        // If the stream is not open, we don't want the stream to be scheduled for
222        // execution (pending_send). Note that if the stream is in pending_open, it will be
223        // pushed to pending_send when there is room for an open stream.
224        //
225        // In pending_push we track whether a PushPromise still needs to be sent
226        // from a different stream before we can start sending frames on this one.
227        // This is different from the "open" check because reserved streams don't count
228        // toward the concurrency limit.
229        // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
230        !self.is_pending_open && !self.is_pending_push
231    }
232
233    /// Returns true if the stream is closed
234    pub fn is_closed(&self) -> bool {
235        // The state has fully transitioned to closed.
236        self.state.is_closed() &&
237            // Because outbound frames transition the stream state before being
238            // buffered, we have to ensure that all frames have been flushed.
239            self.pending_send.is_empty() &&
240            // Sometimes large data frames are sent out in chunks. After a chunk
241            // of the frame is sent, the remainder is pushed back onto the send
242            // queue to be rescheduled.
243            //
244            // Checking for additional buffered data lets us catch this case.
245            self.buffered_send_data == 0
246    }
247
248    /// Returns true if the stream is no longer in use
249    pub fn is_released(&self) -> bool {
250        // The stream is closed and fully flushed
251        self.is_closed() &&
252            // There are no more outstanding references to the stream
253            self.ref_count == 0 &&
254            // The stream is not in any queue
255            !self.is_pending_send && !self.is_pending_send_capacity &&
256            !self.is_pending_accept && !self.is_pending_window_update &&
257            !self.is_pending_open && self.reset_at.is_none()
258    }
259
260    /// Returns true when the consumer of the stream has dropped all handles
261    /// (indicating no further interest in the stream) and the stream state is
262    /// not actually closed.
263    ///
264    /// In this case, a reset should be sent.
265    pub fn is_canceled_interest(&self) -> bool {
266        self.ref_count == 0 && !self.state.is_closed()
267    }
268
269    /// Current available stream send capacity
270    pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
271        let available = self.send_flow.available().as_size() as usize;
272        let buffered = self.buffered_send_data;
273
274        available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
275    }
276
277    pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
278        let prev_capacity = self.capacity(max_buffer_size);
279        debug_assert!(capacity > 0);
280        // TODO: proper error handling
281        let _res = self.send_flow.assign_capacity(capacity);
282        debug_assert!(_res.is_ok());
283
284        tracing::trace!(
285            "  assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
286            self.send_flow.available(),
287            self.buffered_send_data,
288            self.id,
289            max_buffer_size,
290            prev_capacity,
291        );
292
293        if prev_capacity < self.capacity(max_buffer_size) {
294            self.notify_capacity();
295        }
296    }
297
298    pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
299        let prev_capacity = self.capacity(max_buffer_size);
300
301        // TODO: proper error handling
302        let _res = self.send_flow.send_data(len);
303        debug_assert!(_res.is_ok());
304
305        // Decrement the stream's buffered data counter
306        debug_assert!(self.buffered_send_data >= len as usize);
307        self.buffered_send_data -= len as usize;
308        self.requested_send_capacity -= len;
309
310        tracing::trace!(
311            "  sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
312            self.send_flow.available(),
313            self.buffered_send_data,
314            self.id,
315            max_buffer_size,
316            prev_capacity,
317        );
318
319        if prev_capacity < self.capacity(max_buffer_size) {
320            self.notify_capacity();
321        }
322    }
323
324    /// If the capacity was limited because of the max_send_buffer_size,
325    /// then consider waking the send task again...
326    pub fn notify_capacity(&mut self) {
327        self.send_capacity_inc = true;
328        tracing::trace!("  notifying task");
329        self.notify_send();
330    }
331
332    /// Returns `Err` when the decrement cannot be completed due to overflow.
333    pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
334        match self.content_length {
335            ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
336                Some(val) => *rem = val,
337                None => return Err(()),
338            },
339            ContentLength::Head => {
340                if len != 0 {
341                    return Err(());
342                }
343            }
344            _ => {}
345        }
346
347        Ok(())
348    }
349
350    pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
351        match self.content_length {
352            ContentLength::Remaining(0) => Ok(()),
353            ContentLength::Remaining(_) => Err(()),
354            _ => Ok(()),
355        }
356    }
357
358    pub fn notify_send(&mut self) {
359        if let Some(task) = self.send_task.take() {
360            task.wake();
361        }
362    }
363
364    pub fn wait_send(&mut self, cx: &Context) {
365        self.send_task = Some(cx.waker().clone());
366    }
367
368    pub fn notify_recv(&mut self) {
369        if let Some(task) = self.recv_task.take() {
370            task.wake();
371        }
372    }
373}
374
375impl store::Next for NextAccept {
376    fn next(stream: &Stream) -> Option<store::Key> {
377        stream.next_pending_accept
378    }
379
380    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
381        stream.next_pending_accept = key;
382    }
383
384    fn take_next(stream: &mut Stream) -> Option<store::Key> {
385        stream.next_pending_accept.take()
386    }
387
388    fn is_queued(stream: &Stream) -> bool {
389        stream.is_pending_accept
390    }
391
392    fn set_queued(stream: &mut Stream, val: bool) {
393        stream.is_pending_accept = val;
394    }
395}
396
397impl store::Next for NextSend {
398    fn next(stream: &Stream) -> Option<store::Key> {
399        stream.next_pending_send
400    }
401
402    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
403        stream.next_pending_send = key;
404    }
405
406    fn take_next(stream: &mut Stream) -> Option<store::Key> {
407        stream.next_pending_send.take()
408    }
409
410    fn is_queued(stream: &Stream) -> bool {
411        stream.is_pending_send
412    }
413
414    fn set_queued(stream: &mut Stream, val: bool) {
415        if val {
416            // ensure that stream is not queued for being opened
417            // if it's being put into queue for sending data
418            debug_assert!(!stream.is_pending_open);
419        }
420        stream.is_pending_send = val;
421    }
422}
423
424impl store::Next for NextSendCapacity {
425    fn next(stream: &Stream) -> Option<store::Key> {
426        stream.next_pending_send_capacity
427    }
428
429    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
430        stream.next_pending_send_capacity = key;
431    }
432
433    fn take_next(stream: &mut Stream) -> Option<store::Key> {
434        stream.next_pending_send_capacity.take()
435    }
436
437    fn is_queued(stream: &Stream) -> bool {
438        stream.is_pending_send_capacity
439    }
440
441    fn set_queued(stream: &mut Stream, val: bool) {
442        stream.is_pending_send_capacity = val;
443    }
444}
445
446impl store::Next for NextWindowUpdate {
447    fn next(stream: &Stream) -> Option<store::Key> {
448        stream.next_window_update
449    }
450
451    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
452        stream.next_window_update = key;
453    }
454
455    fn take_next(stream: &mut Stream) -> Option<store::Key> {
456        stream.next_window_update.take()
457    }
458
459    fn is_queued(stream: &Stream) -> bool {
460        stream.is_pending_window_update
461    }
462
463    fn set_queued(stream: &mut Stream, val: bool) {
464        stream.is_pending_window_update = val;
465    }
466}
467
468impl store::Next for NextOpen {
469    fn next(stream: &Stream) -> Option<store::Key> {
470        stream.next_open
471    }
472
473    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
474        stream.next_open = key;
475    }
476
477    fn take_next(stream: &mut Stream) -> Option<store::Key> {
478        stream.next_open.take()
479    }
480
481    fn is_queued(stream: &Stream) -> bool {
482        stream.is_pending_open
483    }
484
485    fn set_queued(stream: &mut Stream, val: bool) {
486        if val {
487            // ensure that stream is not queued for being sent
488            // if it's being put into queue for opening the stream
489            debug_assert!(!stream.is_pending_send);
490        }
491        stream.is_pending_open = val;
492    }
493}
494
495impl store::Next for NextResetExpire {
496    fn next(stream: &Stream) -> Option<store::Key> {
497        stream.next_reset_expire
498    }
499
500    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
501        stream.next_reset_expire = key;
502    }
503
504    fn take_next(stream: &mut Stream) -> Option<store::Key> {
505        stream.next_reset_expire.take()
506    }
507
508    fn is_queued(stream: &Stream) -> bool {
509        stream.reset_at.is_some()
510    }
511
512    fn set_queued(stream: &mut Stream, val: bool) {
513        if val {
514            stream.reset_at = Some(Instant::now());
515        } else {
516            stream.reset_at = None;
517        }
518    }
519}
520
521// ===== impl ContentLength =====
522
523impl ContentLength {
524    pub fn is_head(&self) -> bool {
525        matches!(*self, Self::Head)
526    }
527}