hyper/proto/h2/
mod.rs

1use std::error::Error as StdError;
2use std::future::Future;
3use std::io::{Cursor, IoSlice};
4use std::mem;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytes::{Buf, Bytes};
9use futures_util::ready;
10use h2::{Reason, RecvStream, SendStream};
11use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
12use http::HeaderMap;
13use pin_project_lite::pin_project;
14
15use crate::body::Body;
16use crate::proto::h2::ping::Recorder;
17use crate::rt::{Read, ReadBufCursor, Write};
18
19pub(crate) mod ping;
20
21cfg_client! {
22    pub(crate) mod client;
23    pub(crate) use self::client::ClientTask;
24}
25
26cfg_server! {
27    pub(crate) mod server;
28    pub(crate) use self::server::Server;
29}
30
31/// Default initial stream window size defined in HTTP2 spec.
32pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
33
34// List of connection headers from RFC 9110 Section 7.6.1
35//
36// TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
37// tested separately.
38static CONNECTION_HEADERS: [HeaderName; 4] = [
39    HeaderName::from_static("keep-alive"),
40    HeaderName::from_static("proxy-connection"),
41    TRANSFER_ENCODING,
42    UPGRADE,
43];
44
45fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
46    for header in &CONNECTION_HEADERS {
47        if headers.remove(header).is_some() {
48            warn!("Connection header illegal in HTTP/2: {}", header.as_str());
49        }
50    }
51
52    if is_request {
53        if headers
54            .get(TE)
55            .map_or(false, |te_header| te_header != "trailers")
56        {
57            warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
58            headers.remove(TE);
59        }
60    } else if headers.remove(TE).is_some() {
61        warn!("TE headers illegal in HTTP/2 responses");
62    }
63
64    if let Some(header) = headers.remove(CONNECTION) {
65        warn!(
66            "Connection header illegal in HTTP/2: {}",
67            CONNECTION.as_str()
68        );
69        let header_contents = header.to_str().unwrap();
70
71        // A `Connection` header may have a comma-separated list of names of other headers that
72        // are meant for only this specific connection.
73        //
74        // Iterate these names and remove them as headers. Connection-specific headers are
75        // forbidden in HTTP2, as that information has been moved into frame types of the h2
76        // protocol.
77        for name in header_contents.split(',') {
78            let name = name.trim();
79            headers.remove(name);
80        }
81    }
82}
83
84// body adapters used by both Client and Server
85
86pin_project! {
87    pub(crate) struct PipeToSendStream<S>
88    where
89        S: Body,
90    {
91        body_tx: SendStream<SendBuf<S::Data>>,
92        data_done: bool,
93        #[pin]
94        stream: S,
95    }
96}
97
98impl<S> PipeToSendStream<S>
99where
100    S: Body,
101{
102    fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
103        PipeToSendStream {
104            body_tx: tx,
105            data_done: false,
106            stream,
107        }
108    }
109}
110
111impl<S> Future for PipeToSendStream<S>
112where
113    S: Body,
114    S::Error: Into<Box<dyn StdError + Send + Sync>>,
115{
116    type Output = crate::Result<()>;
117
118    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
119        let mut me = self.project();
120        loop {
121            // we don't have the next chunk of data yet, so just reserve 1 byte to make
122            // sure there's some capacity available. h2 will handle the capacity management
123            // for the actual body chunk.
124            me.body_tx.reserve_capacity(1);
125
126            if me.body_tx.capacity() == 0 {
127                loop {
128                    match ready!(me.body_tx.poll_capacity(cx)) {
129                        Some(Ok(0)) => {}
130                        Some(Ok(_)) => break,
131                        Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))),
132                        None => {
133                            // None means the stream is no longer in a
134                            // streaming state, we either finished it
135                            // somehow, or the remote reset us.
136                            return Poll::Ready(Err(crate::Error::new_body_write(
137                                "send stream capacity unexpectedly closed",
138                            )));
139                        }
140                    }
141                }
142            } else if let Poll::Ready(reason) = me
143                .body_tx
144                .poll_reset(cx)
145                .map_err(crate::Error::new_body_write)?
146            {
147                debug!("stream received RST_STREAM: {:?}", reason);
148                return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));
149            }
150
151            match ready!(me.stream.as_mut().poll_frame(cx)) {
152                Some(Ok(frame)) => {
153                    if frame.is_data() {
154                        let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
155                        let is_eos = me.stream.is_end_stream();
156                        trace!(
157                            "send body chunk: {} bytes, eos={}",
158                            chunk.remaining(),
159                            is_eos,
160                        );
161
162                        let buf = SendBuf::Buf(chunk);
163                        me.body_tx
164                            .send_data(buf, is_eos)
165                            .map_err(crate::Error::new_body_write)?;
166
167                        if is_eos {
168                            return Poll::Ready(Ok(()));
169                        }
170                    } else if frame.is_trailers() {
171                        // no more DATA, so give any capacity back
172                        me.body_tx.reserve_capacity(0);
173                        me.body_tx
174                            .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
175                            .map_err(crate::Error::new_body_write)?;
176                        return Poll::Ready(Ok(()));
177                    } else {
178                        trace!("discarding unknown frame");
179                        // loop again
180                    }
181                }
182                Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
183                None => {
184                    // no more frames means we're done here
185                    // but at this point, we haven't sent an EOS DATA, or
186                    // any trailers, so send an empty EOS DATA.
187                    return Poll::Ready(me.body_tx.send_eos_frame());
188                }
189            }
190        }
191    }
192}
193
194trait SendStreamExt {
195    fn on_user_err<E>(&mut self, err: E) -> crate::Error
196    where
197        E: Into<Box<dyn std::error::Error + Send + Sync>>;
198    fn send_eos_frame(&mut self) -> crate::Result<()>;
199}
200
201impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
202    fn on_user_err<E>(&mut self, err: E) -> crate::Error
203    where
204        E: Into<Box<dyn std::error::Error + Send + Sync>>,
205    {
206        let err = crate::Error::new_user_body(err);
207        debug!("send body user stream error: {}", err);
208        self.send_reset(err.h2_reason());
209        err
210    }
211
212    fn send_eos_frame(&mut self) -> crate::Result<()> {
213        trace!("send body eos");
214        self.send_data(SendBuf::None, true)
215            .map_err(crate::Error::new_body_write)
216    }
217}
218
219#[repr(usize)]
220enum SendBuf<B> {
221    Buf(B),
222    Cursor(Cursor<Box<[u8]>>),
223    None,
224}
225
226impl<B: Buf> Buf for SendBuf<B> {
227    #[inline]
228    fn remaining(&self) -> usize {
229        match *self {
230            Self::Buf(ref b) => b.remaining(),
231            Self::Cursor(ref c) => Buf::remaining(c),
232            Self::None => 0,
233        }
234    }
235
236    #[inline]
237    fn chunk(&self) -> &[u8] {
238        match *self {
239            Self::Buf(ref b) => b.chunk(),
240            Self::Cursor(ref c) => c.chunk(),
241            Self::None => &[],
242        }
243    }
244
245    #[inline]
246    fn advance(&mut self, cnt: usize) {
247        match *self {
248            Self::Buf(ref mut b) => b.advance(cnt),
249            Self::Cursor(ref mut c) => c.advance(cnt),
250            Self::None => {}
251        }
252    }
253
254    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
255        match *self {
256            Self::Buf(ref b) => b.chunks_vectored(dst),
257            Self::Cursor(ref c) => c.chunks_vectored(dst),
258            Self::None => 0,
259        }
260    }
261}
262
263struct H2Upgraded<B>
264where
265    B: Buf,
266{
267    ping: Recorder,
268    send_stream: UpgradedSendStream<B>,
269    recv_stream: RecvStream,
270    buf: Bytes,
271}
272
273impl<B> Read for H2Upgraded<B>
274where
275    B: Buf,
276{
277    fn poll_read(
278        mut self: Pin<&mut Self>,
279        cx: &mut Context<'_>,
280        mut read_buf: ReadBufCursor<'_>,
281    ) -> Poll<Result<(), std::io::Error>> {
282        if self.buf.is_empty() {
283            self.buf = loop {
284                match ready!(self.recv_stream.poll_data(cx)) {
285                    None => return Poll::Ready(Ok(())),
286                    Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
287                        continue
288                    }
289                    Some(Ok(buf)) => {
290                        self.ping.record_data(buf.len());
291                        break buf;
292                    }
293                    Some(Err(e)) => {
294                        return Poll::Ready(match e.reason() {
295                            Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
296                            Some(Reason::STREAM_CLOSED) => {
297                                Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
298                            }
299                            _ => Err(h2_to_io_error(e)),
300                        })
301                    }
302                }
303            };
304        }
305        let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
306        read_buf.put_slice(&self.buf[..cnt]);
307        self.buf.advance(cnt);
308        let _ = self.recv_stream.flow_control().release_capacity(cnt);
309        Poll::Ready(Ok(()))
310    }
311}
312
313impl<B> Write for H2Upgraded<B>
314where
315    B: Buf,
316{
317    fn poll_write(
318        mut self: Pin<&mut Self>,
319        cx: &mut Context<'_>,
320        buf: &[u8],
321    ) -> Poll<Result<usize, std::io::Error>> {
322        if buf.is_empty() {
323            return Poll::Ready(Ok(0));
324        }
325        self.send_stream.reserve_capacity(buf.len());
326
327        // We ignore all errors returned by `poll_capacity` and `write`, as we
328        // will get the correct from `poll_reset` anyway.
329        let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
330            None => Some(0),
331            Some(Ok(cnt)) => self
332                .send_stream
333                .write(&buf[..cnt], false)
334                .ok()
335                .map(|()| cnt),
336            Some(Err(_)) => None,
337        };
338
339        if let Some(cnt) = cnt {
340            return Poll::Ready(Ok(cnt));
341        }
342
343        Poll::Ready(Err(h2_to_io_error(
344            match ready!(self.send_stream.poll_reset(cx)) {
345                Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
346                    return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
347                }
348                Ok(reason) => reason.into(),
349                Err(e) => e,
350            },
351        )))
352    }
353
354    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
355        Poll::Ready(Ok(()))
356    }
357
358    fn poll_shutdown(
359        mut self: Pin<&mut Self>,
360        cx: &mut Context<'_>,
361    ) -> Poll<Result<(), std::io::Error>> {
362        if self.send_stream.write(&[], true).is_ok() {
363            return Poll::Ready(Ok(()));
364        }
365
366        Poll::Ready(Err(h2_to_io_error(
367            match ready!(self.send_stream.poll_reset(cx)) {
368                Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
369                Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
370                    return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
371                }
372                Ok(reason) => reason.into(),
373                Err(e) => e,
374            },
375        )))
376    }
377}
378
379fn h2_to_io_error(e: h2::Error) -> std::io::Error {
380    if e.is_io() {
381        e.into_io().unwrap()
382    } else {
383        std::io::Error::new(std::io::ErrorKind::Other, e)
384    }
385}
386
387struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
388
389impl<B> UpgradedSendStream<B>
390where
391    B: Buf,
392{
393    unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
394        assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
395        Self(mem::transmute(inner))
396    }
397
398    fn reserve_capacity(&mut self, cnt: usize) {
399        unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
400    }
401
402    fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
403        unsafe { self.as_inner_unchecked().poll_capacity(cx) }
404    }
405
406    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
407        unsafe { self.as_inner_unchecked().poll_reset(cx) }
408    }
409
410    fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), std::io::Error> {
411        let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
412        unsafe {
413            self.as_inner_unchecked()
414                .send_data(send_buf, end_of_stream)
415                .map_err(h2_to_io_error)
416        }
417    }
418
419    unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
420        &mut *(&mut self.0 as *mut _ as *mut _)
421    }
422}
423
424#[repr(transparent)]
425struct Neutered<B> {
426    _inner: B,
427    impossible: Impossible,
428}
429
430enum Impossible {}
431
432unsafe impl<B> Send for Neutered<B> {}
433
434impl<B> Buf for Neutered<B> {
435    fn remaining(&self) -> usize {
436        match self.impossible {}
437    }
438
439    fn chunk(&self) -> &[u8] {
440        match self.impossible {}
441    }
442
443    fn advance(&mut self, _cnt: usize) {
444        match self.impossible {}
445    }
446}