h2/proto/streams/
state.rs

1use std::io;
2
3use crate::codec::UserError;
4use crate::frame::{self, Reason, StreamId};
5use crate::proto::{self, Error, Initiator, PollReset};
6
7use self::Inner::*;
8use self::Peer::*;
9
10/// Represents the state of an H2 stream
11///
12/// ```not_rust
13///                              +--------+
14///                      send PP |        | recv PP
15///                     ,--------|  idle  |--------.
16///                    /         |        |         \
17///                   v          +--------+          v
18///            +----------+          |           +----------+
19///            |          |          | send H /  |          |
20///     ,------| reserved |          | recv H    | reserved |------.
21///     |      | (local)  |          |           | (remote) |      |
22///     |      +----------+          v           +----------+      |
23///     |          |             +--------+             |          |
24///     |          |     recv ES |        | send ES     |          |
25///     |   send H |     ,-------|  open  |-------.     | recv H   |
26///     |          |    /        |        |        \    |          |
27///     |          v   v         +--------+         v   v          |
28///     |      +----------+          |           +----------+      |
29///     |      |   half   |          |           |   half   |      |
30///     |      |  closed  |          | send R /  |  closed  |      |
31///     |      | (remote) |          | recv R    | (local)  |      |
32///     |      +----------+          |           +----------+      |
33///     |           |                |                 |           |
34///     |           | send ES /      |       recv ES / |           |
35///     |           | send R /       v        send R / |           |
36///     |           | recv R     +--------+   recv R   |           |
37///     | send R /  `----------->|        |<-----------'  send R / |
38///     | recv R                 | closed |               recv R   |
39///     `----------------------->|        |<----------------------'
40///                              +--------+
41///
42///        send:   endpoint sends this frame
43///        recv:   endpoint receives this frame
44///
45///        H:  HEADERS frame (with implied CONTINUATIONs)
46///        PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
47///        ES: END_STREAM flag
48///        R:  RST_STREAM frame
49/// ```
50#[derive(Debug, Clone)]
51pub struct State {
52    inner: Inner,
53}
54
55#[derive(Debug, Clone)]
56enum Inner {
57    Idle,
58    // TODO: these states shouldn't count against concurrency limits:
59    ReservedLocal,
60    ReservedRemote,
61    Open { local: Peer, remote: Peer },
62    HalfClosedLocal(Peer), // TODO: explicitly name this value
63    HalfClosedRemote(Peer),
64    Closed(Cause),
65}
66
67#[derive(Debug, Copy, Clone, Default)]
68enum Peer {
69    #[default]
70    AwaitingHeaders,
71    Streaming,
72}
73
74#[derive(Debug, Clone)]
75enum Cause {
76    EndStream,
77    Error(Error),
78
79    /// This indicates to the connection that a reset frame must be sent out
80    /// once the send queue has been flushed.
81    ///
82    /// Examples of when this could happen:
83    /// - User drops all references to a stream, so we want to CANCEL the it.
84    /// - Header block size was too large, so we want to REFUSE, possibly
85    ///   after sending a 431 response frame.
86    ScheduledLibraryReset(Reason),
87}
88
89impl State {
90    /// Opens the send-half of a stream if it is not already open.
91    pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
92        let local = Streaming;
93
94        self.inner = match self.inner {
95            Idle => {
96                if eos {
97                    HalfClosedLocal(AwaitingHeaders)
98                } else {
99                    Open {
100                        local,
101                        remote: AwaitingHeaders,
102                    }
103                }
104            }
105            Open {
106                local: AwaitingHeaders,
107                remote,
108            } => {
109                if eos {
110                    HalfClosedLocal(remote)
111                } else {
112                    Open { local, remote }
113                }
114            }
115            HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
116                if eos {
117                    Closed(Cause::EndStream)
118                } else {
119                    HalfClosedRemote(local)
120                }
121            }
122            _ => {
123                // All other transitions result in a protocol error
124                return Err(UserError::UnexpectedFrameType);
125            }
126        };
127
128        Ok(())
129    }
130
131    /// Opens the receive-half of the stream when a HEADERS frame is received.
132    ///
133    /// Returns true if this transitions the state to Open.
134    pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
135        let mut initial = false;
136        let eos = frame.is_end_stream();
137
138        self.inner = match self.inner {
139            Idle => {
140                initial = true;
141
142                if eos {
143                    HalfClosedRemote(AwaitingHeaders)
144                } else {
145                    Open {
146                        local: AwaitingHeaders,
147                        remote: if frame.is_informational() {
148                            tracing::trace!("skipping 1xx response headers");
149                            AwaitingHeaders
150                        } else {
151                            Streaming
152                        },
153                    }
154                }
155            }
156            ReservedRemote => {
157                initial = true;
158
159                if eos {
160                    Closed(Cause::EndStream)
161                } else if frame.is_informational() {
162                    tracing::trace!("skipping 1xx response headers");
163                    ReservedRemote
164                } else {
165                    HalfClosedLocal(Streaming)
166                }
167            }
168            Open {
169                local,
170                remote: AwaitingHeaders,
171            } => {
172                if eos {
173                    HalfClosedRemote(local)
174                } else {
175                    Open {
176                        local,
177                        remote: if frame.is_informational() {
178                            tracing::trace!("skipping 1xx response headers");
179                            AwaitingHeaders
180                        } else {
181                            Streaming
182                        },
183                    }
184                }
185            }
186            HalfClosedLocal(AwaitingHeaders) => {
187                if eos {
188                    Closed(Cause::EndStream)
189                } else if frame.is_informational() {
190                    tracing::trace!("skipping 1xx response headers");
191                    HalfClosedLocal(AwaitingHeaders)
192                } else {
193                    HalfClosedLocal(Streaming)
194                }
195            }
196            ref state => {
197                // All other transitions result in a protocol error
198                proto_err!(conn: "recv_open: in unexpected state {:?}", state);
199                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
200            }
201        };
202
203        Ok(initial)
204    }
205
206    /// Transition from Idle -> ReservedRemote
207    pub fn reserve_remote(&mut self) -> Result<(), Error> {
208        match self.inner {
209            Idle => {
210                self.inner = ReservedRemote;
211                Ok(())
212            }
213            ref state => {
214                proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
215                Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
216            }
217        }
218    }
219
220    /// Transition from Idle -> ReservedLocal
221    pub fn reserve_local(&mut self) -> Result<(), UserError> {
222        match self.inner {
223            Idle => {
224                self.inner = ReservedLocal;
225                Ok(())
226            }
227            _ => Err(UserError::UnexpectedFrameType),
228        }
229    }
230
231    /// Indicates that the remote side will not send more data to the local.
232    pub fn recv_close(&mut self) -> Result<(), Error> {
233        match self.inner {
234            Open { local, .. } => {
235                // The remote side will continue to receive data.
236                tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
237                self.inner = HalfClosedRemote(local);
238                Ok(())
239            }
240            HalfClosedLocal(..) => {
241                tracing::trace!("recv_close: HalfClosedLocal => Closed");
242                self.inner = Closed(Cause::EndStream);
243                Ok(())
244            }
245            ref state => {
246                proto_err!(conn: "recv_close: in unexpected state {:?}", state);
247                Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
248            }
249        }
250    }
251
252    /// The remote explicitly sent a RST_STREAM.
253    ///
254    /// # Arguments
255    /// - `frame`: the received RST_STREAM frame.
256    /// - `queued`: true if this stream has frames in the pending send queue.
257    pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
258        match self.inner {
259            // If the stream is already in a `Closed` state, do nothing,
260            // provided that there are no frames still in the send queue.
261            Closed(..) if !queued => {}
262            // A notionally `Closed` stream may still have queued frames in
263            // the following cases:
264            //
265            // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
266            //   actually closed the stream yet).
267            // - if the cause is `Cause::EndStream`: we transition to this
268            //   state when an EOS frame is *enqueued* (so that it's invalid
269            //   to enqueue more frames), not when the EOS frame is *sent*;
270            //   therefore, there may still be frames ahead of the EOS frame
271            //   in the send queue.
272            //
273            // In either of these cases, we want to overwrite the stream's
274            // previous state with the received RST_STREAM, so that the queue
275            // will be cleared by `Prioritize::pop_frame`.
276            ref state => {
277                tracing::trace!(
278                    "recv_reset; frame={:?}; state={:?}; queued={:?}",
279                    frame,
280                    state,
281                    queued
282                );
283                self.inner = Closed(Cause::Error(Error::remote_reset(
284                    frame.stream_id(),
285                    frame.reason(),
286                )));
287            }
288        }
289    }
290
291    /// Handle a connection-level error.
292    pub fn handle_error(&mut self, err: &proto::Error) {
293        match self.inner {
294            Closed(..) => {}
295            _ => {
296                tracing::trace!("handle_error; err={:?}", err);
297                self.inner = Closed(Cause::Error(err.clone()));
298            }
299        }
300    }
301
302    pub fn recv_eof(&mut self) {
303        match self.inner {
304            Closed(..) => {}
305            ref state => {
306                tracing::trace!("recv_eof; state={:?}", state);
307                self.inner = Closed(Cause::Error(
308                    io::Error::new(
309                        io::ErrorKind::BrokenPipe,
310                        "stream closed because of a broken pipe",
311                    )
312                    .into(),
313                ));
314            }
315        }
316    }
317
318    /// Indicates that the local side will not send more data to the local.
319    pub fn send_close(&mut self) {
320        match self.inner {
321            Open { remote, .. } => {
322                // The remote side will continue to receive data.
323                tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
324                self.inner = HalfClosedLocal(remote);
325            }
326            HalfClosedRemote(..) => {
327                tracing::trace!("send_close: HalfClosedRemote => Closed");
328                self.inner = Closed(Cause::EndStream);
329            }
330            ref state => panic!("send_close: unexpected state {:?}", state),
331        }
332    }
333
334    /// Set the stream state to reset locally.
335    pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
336        self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
337    }
338
339    /// Set the stream state to a scheduled reset.
340    pub fn set_scheduled_reset(&mut self, reason: Reason) {
341        debug_assert!(!self.is_closed());
342        self.inner = Closed(Cause::ScheduledLibraryReset(reason));
343    }
344
345    pub fn get_scheduled_reset(&self) -> Option<Reason> {
346        match self.inner {
347            Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
348            _ => None,
349        }
350    }
351
352    pub fn is_scheduled_reset(&self) -> bool {
353        matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
354    }
355
356    pub fn is_local_error(&self) -> bool {
357        match self.inner {
358            Closed(Cause::Error(ref e)) => e.is_local(),
359            Closed(Cause::ScheduledLibraryReset(..)) => true,
360            _ => false,
361        }
362    }
363
364    pub fn is_remote_reset(&self) -> bool {
365        matches!(
366            self.inner,
367            Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
368        )
369    }
370
371    /// Returns true if the stream is already reset.
372    pub fn is_reset(&self) -> bool {
373        match self.inner {
374            Closed(Cause::EndStream) => false,
375            Closed(_) => true,
376            _ => false,
377        }
378    }
379
380    pub fn is_send_streaming(&self) -> bool {
381        matches!(
382            self.inner,
383            Open {
384                local: Streaming,
385                ..
386            } | HalfClosedRemote(Streaming)
387        )
388    }
389
390    /// Returns true when the stream is in a state to receive headers
391    pub fn is_recv_headers(&self) -> bool {
392        matches!(
393            self.inner,
394            Idle | Open {
395                remote: AwaitingHeaders,
396                ..
397            } | HalfClosedLocal(AwaitingHeaders)
398                | ReservedRemote
399        )
400    }
401
402    pub fn is_recv_streaming(&self) -> bool {
403        matches!(
404            self.inner,
405            Open {
406                remote: Streaming,
407                ..
408            } | HalfClosedLocal(Streaming)
409        )
410    }
411
412    pub fn is_closed(&self) -> bool {
413        matches!(self.inner, Closed(_))
414    }
415
416    pub fn is_recv_closed(&self) -> bool {
417        matches!(
418            self.inner,
419            Closed(..) | HalfClosedRemote(..) | ReservedLocal
420        )
421    }
422
423    pub fn is_send_closed(&self) -> bool {
424        matches!(
425            self.inner,
426            Closed(..) | HalfClosedLocal(..) | ReservedRemote
427        )
428    }
429
430    pub fn is_idle(&self) -> bool {
431        matches!(self.inner, Idle)
432    }
433
434    pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
435        // TODO: Is this correct?
436        match self.inner {
437            Closed(Cause::Error(ref e)) => Err(e.clone()),
438            Closed(Cause::ScheduledLibraryReset(reason)) => {
439                Err(proto::Error::library_go_away(reason))
440            }
441            Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
442            _ => Ok(true),
443        }
444    }
445
446    /// Returns a reason if the stream has been reset.
447    pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
448        match self.inner {
449            Closed(Cause::Error(Error::Reset(_, reason, _)))
450            | Closed(Cause::Error(Error::GoAway(_, reason, _)))
451            | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
452            Closed(Cause::Error(ref e)) => Err(e.clone().into()),
453            Open {
454                local: Streaming, ..
455            }
456            | HalfClosedRemote(Streaming) => match mode {
457                PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
458                PollReset::Streaming => Ok(None),
459            },
460            _ => Ok(None),
461        }
462    }
463}
464
465impl Default for State {
466    fn default() -> State {
467        State { inner: Inner::Idle }
468    }
469}