h2/proto/
go_away.rs

1use crate::codec::Codec;
2use crate::frame::{self, Reason, StreamId};
3
4use bytes::Buf;
5use std::io;
6use std::task::{Context, Poll};
7use tokio::io::AsyncWrite;
8
9/// Manages our sending of GOAWAY frames.
10#[derive(Debug)]
11pub(super) struct GoAway {
12    /// Whether the connection should close now, or wait until idle.
13    close_now: bool,
14    /// Records if we've sent any GOAWAY before.
15    going_away: Option<GoingAway>,
16    /// Whether the user started the GOAWAY by calling `abrupt_shutdown`.
17    is_user_initiated: bool,
18    /// A GOAWAY frame that must be buffered in the Codec immediately.
19    pending: Option<frame::GoAway>,
20}
21
22/// Keeps a memory of any GOAWAY frames we've sent before.
23///
24/// This looks very similar to a `frame::GoAway`, but is a separate type. Why?
25/// Mostly for documentation purposes. This type is to record status. If it
26/// were a `frame::GoAway`, it might appear like we eventually wanted to
27/// serialize it. We **only** want to be able to look up these fields at a
28/// later time.
29#[derive(Debug)]
30pub(crate) struct GoingAway {
31    /// Stores the highest stream ID of a GOAWAY that has been sent.
32    ///
33    /// It's illegal to send a subsequent GOAWAY with a higher ID.
34    last_processed_id: StreamId,
35
36    /// Records the error code of any GOAWAY frame sent.
37    reason: Reason,
38}
39
40impl GoAway {
41    pub fn new() -> Self {
42        GoAway {
43            close_now: false,
44            going_away: None,
45            is_user_initiated: false,
46            pending: None,
47        }
48    }
49
50    /// Enqueue a GOAWAY frame to be written.
51    ///
52    /// The connection is expected to continue to run until idle.
53    pub fn go_away(&mut self, f: frame::GoAway) {
54        if let Some(ref going_away) = self.going_away {
55            assert!(
56                f.last_stream_id() <= going_away.last_processed_id,
57                "GOAWAY stream IDs shouldn't be higher; \
58                 last_processed_id = {:?}, f.last_stream_id() = {:?}",
59                going_away.last_processed_id,
60                f.last_stream_id(),
61            );
62        }
63
64        self.going_away = Some(GoingAway {
65            last_processed_id: f.last_stream_id(),
66            reason: f.reason(),
67        });
68        self.pending = Some(f);
69    }
70
71    pub fn go_away_now(&mut self, f: frame::GoAway) {
72        self.close_now = true;
73        if let Some(ref going_away) = self.going_away {
74            // Prevent sending the same GOAWAY twice.
75            if going_away.last_processed_id == f.last_stream_id() && going_away.reason == f.reason()
76            {
77                return;
78            }
79        }
80        self.go_away(f);
81    }
82
83    pub fn go_away_from_user(&mut self, f: frame::GoAway) {
84        self.is_user_initiated = true;
85        self.go_away_now(f);
86    }
87
88    /// Return if a GOAWAY has ever been scheduled.
89    pub fn is_going_away(&self) -> bool {
90        self.going_away.is_some()
91    }
92
93    pub fn is_user_initiated(&self) -> bool {
94        self.is_user_initiated
95    }
96
97    /// Returns the going away info, if any.
98    pub fn going_away(&self) -> Option<&GoingAway> {
99        self.going_away.as_ref()
100    }
101
102    /// Returns if the connection should close now, or wait until idle.
103    pub fn should_close_now(&self) -> bool {
104        self.pending.is_none() && self.close_now
105    }
106
107    /// Returns if the connection should be closed when idle.
108    pub fn should_close_on_idle(&self) -> bool {
109        !self.close_now
110            && self
111                .going_away
112                .as_ref()
113                .map(|g| g.last_processed_id != StreamId::MAX)
114                .unwrap_or(false)
115    }
116
117    /// Try to write a pending GOAWAY frame to the buffer.
118    ///
119    /// If a frame is written, the `Reason` of the GOAWAY is returned.
120    pub fn send_pending_go_away<T, B>(
121        &mut self,
122        cx: &mut Context,
123        dst: &mut Codec<T, B>,
124    ) -> Poll<Option<io::Result<Reason>>>
125    where
126        T: AsyncWrite + Unpin,
127        B: Buf,
128    {
129        if let Some(frame) = self.pending.take() {
130            if !dst.poll_ready(cx)?.is_ready() {
131                self.pending = Some(frame);
132                return Poll::Pending;
133            }
134
135            let reason = frame.reason();
136            dst.buffer(frame.into()).expect("invalid GOAWAY frame");
137
138            return Poll::Ready(Some(Ok(reason)));
139        } else if self.should_close_now() {
140            return match self.going_away().map(|going_away| going_away.reason) {
141                Some(reason) => Poll::Ready(Some(Ok(reason))),
142                None => Poll::Ready(None),
143            };
144        }
145
146        Poll::Ready(None)
147    }
148}
149
150impl GoingAway {
151    pub(crate) fn reason(&self) -> Reason {
152        self.reason
153    }
154}