1use std::error::Error as StdError;
2use std::future::Future;
3use std::io::{Cursor, IoSlice};
4use std::mem;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytes::{Buf, Bytes};
9use futures_util::ready;
10use h2::{Reason, RecvStream, SendStream};
11use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
12use http::HeaderMap;
13use pin_project_lite::pin_project;
14
15use crate::body::Body;
16use crate::proto::h2::ping::Recorder;
17use crate::rt::{Read, ReadBufCursor, Write};
18
19pub(crate) mod ping;
20
21cfg_client! {
22 pub(crate) mod client;
23 pub(crate) use self::client::ClientTask;
24}
25
26cfg_server! {
27 pub(crate) mod server;
28 pub(crate) use self::server::Server;
29}
30
31pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
33
34static CONNECTION_HEADERS: [HeaderName; 4] = [
39 HeaderName::from_static("keep-alive"),
40 HeaderName::from_static("proxy-connection"),
41 TRANSFER_ENCODING,
42 UPGRADE,
43];
44
45fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
46 for header in &CONNECTION_HEADERS {
47 if headers.remove(header).is_some() {
48 warn!("Connection header illegal in HTTP/2: {}", header.as_str());
49 }
50 }
51
52 if is_request {
53 if headers
54 .get(TE)
55 .map_or(false, |te_header| te_header != "trailers")
56 {
57 warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
58 headers.remove(TE);
59 }
60 } else if headers.remove(TE).is_some() {
61 warn!("TE headers illegal in HTTP/2 responses");
62 }
63
64 if let Some(header) = headers.remove(CONNECTION) {
65 warn!(
66 "Connection header illegal in HTTP/2: {}",
67 CONNECTION.as_str()
68 );
69 let header_contents = header.to_str().unwrap();
70
71 for name in header_contents.split(',') {
78 let name = name.trim();
79 headers.remove(name);
80 }
81 }
82}
83
84pin_project! {
87 pub(crate) struct PipeToSendStream<S>
88 where
89 S: Body,
90 {
91 body_tx: SendStream<SendBuf<S::Data>>,
92 data_done: bool,
93 #[pin]
94 stream: S,
95 }
96}
97
98impl<S> PipeToSendStream<S>
99where
100 S: Body,
101{
102 fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
103 PipeToSendStream {
104 body_tx: tx,
105 data_done: false,
106 stream,
107 }
108 }
109}
110
111impl<S> Future for PipeToSendStream<S>
112where
113 S: Body,
114 S::Error: Into<Box<dyn StdError + Send + Sync>>,
115{
116 type Output = crate::Result<()>;
117
118 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
119 let mut me = self.project();
120 loop {
121 me.body_tx.reserve_capacity(1);
125
126 if me.body_tx.capacity() == 0 {
127 loop {
128 match ready!(me.body_tx.poll_capacity(cx)) {
129 Some(Ok(0)) => {}
130 Some(Ok(_)) => break,
131 Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))),
132 None => {
133 return Poll::Ready(Err(crate::Error::new_body_write(
137 "send stream capacity unexpectedly closed",
138 )));
139 }
140 }
141 }
142 } else if let Poll::Ready(reason) = me
143 .body_tx
144 .poll_reset(cx)
145 .map_err(crate::Error::new_body_write)?
146 {
147 debug!("stream received RST_STREAM: {:?}", reason);
148 return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));
149 }
150
151 match ready!(me.stream.as_mut().poll_frame(cx)) {
152 Some(Ok(frame)) => {
153 if frame.is_data() {
154 let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
155 let is_eos = me.stream.is_end_stream();
156 trace!(
157 "send body chunk: {} bytes, eos={}",
158 chunk.remaining(),
159 is_eos,
160 );
161
162 let buf = SendBuf::Buf(chunk);
163 me.body_tx
164 .send_data(buf, is_eos)
165 .map_err(crate::Error::new_body_write)?;
166
167 if is_eos {
168 return Poll::Ready(Ok(()));
169 }
170 } else if frame.is_trailers() {
171 me.body_tx.reserve_capacity(0);
173 me.body_tx
174 .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
175 .map_err(crate::Error::new_body_write)?;
176 return Poll::Ready(Ok(()));
177 } else {
178 trace!("discarding unknown frame");
179 }
181 }
182 Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
183 None => {
184 return Poll::Ready(me.body_tx.send_eos_frame());
188 }
189 }
190 }
191 }
192}
193
194trait SendStreamExt {
195 fn on_user_err<E>(&mut self, err: E) -> crate::Error
196 where
197 E: Into<Box<dyn std::error::Error + Send + Sync>>;
198 fn send_eos_frame(&mut self) -> crate::Result<()>;
199}
200
201impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
202 fn on_user_err<E>(&mut self, err: E) -> crate::Error
203 where
204 E: Into<Box<dyn std::error::Error + Send + Sync>>,
205 {
206 let err = crate::Error::new_user_body(err);
207 debug!("send body user stream error: {}", err);
208 self.send_reset(err.h2_reason());
209 err
210 }
211
212 fn send_eos_frame(&mut self) -> crate::Result<()> {
213 trace!("send body eos");
214 self.send_data(SendBuf::None, true)
215 .map_err(crate::Error::new_body_write)
216 }
217}
218
219#[repr(usize)]
220enum SendBuf<B> {
221 Buf(B),
222 Cursor(Cursor<Box<[u8]>>),
223 None,
224}
225
226impl<B: Buf> Buf for SendBuf<B> {
227 #[inline]
228 fn remaining(&self) -> usize {
229 match *self {
230 Self::Buf(ref b) => b.remaining(),
231 Self::Cursor(ref c) => Buf::remaining(c),
232 Self::None => 0,
233 }
234 }
235
236 #[inline]
237 fn chunk(&self) -> &[u8] {
238 match *self {
239 Self::Buf(ref b) => b.chunk(),
240 Self::Cursor(ref c) => c.chunk(),
241 Self::None => &[],
242 }
243 }
244
245 #[inline]
246 fn advance(&mut self, cnt: usize) {
247 match *self {
248 Self::Buf(ref mut b) => b.advance(cnt),
249 Self::Cursor(ref mut c) => c.advance(cnt),
250 Self::None => {}
251 }
252 }
253
254 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
255 match *self {
256 Self::Buf(ref b) => b.chunks_vectored(dst),
257 Self::Cursor(ref c) => c.chunks_vectored(dst),
258 Self::None => 0,
259 }
260 }
261}
262
263struct H2Upgraded<B>
264where
265 B: Buf,
266{
267 ping: Recorder,
268 send_stream: UpgradedSendStream<B>,
269 recv_stream: RecvStream,
270 buf: Bytes,
271}
272
273impl<B> Read for H2Upgraded<B>
274where
275 B: Buf,
276{
277 fn poll_read(
278 mut self: Pin<&mut Self>,
279 cx: &mut Context<'_>,
280 mut read_buf: ReadBufCursor<'_>,
281 ) -> Poll<Result<(), std::io::Error>> {
282 if self.buf.is_empty() {
283 self.buf = loop {
284 match ready!(self.recv_stream.poll_data(cx)) {
285 None => return Poll::Ready(Ok(())),
286 Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
287 continue
288 }
289 Some(Ok(buf)) => {
290 self.ping.record_data(buf.len());
291 break buf;
292 }
293 Some(Err(e)) => {
294 return Poll::Ready(match e.reason() {
295 Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
296 Some(Reason::STREAM_CLOSED) => {
297 Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
298 }
299 _ => Err(h2_to_io_error(e)),
300 })
301 }
302 }
303 };
304 }
305 let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
306 read_buf.put_slice(&self.buf[..cnt]);
307 self.buf.advance(cnt);
308 let _ = self.recv_stream.flow_control().release_capacity(cnt);
309 Poll::Ready(Ok(()))
310 }
311}
312
313impl<B> Write for H2Upgraded<B>
314where
315 B: Buf,
316{
317 fn poll_write(
318 mut self: Pin<&mut Self>,
319 cx: &mut Context<'_>,
320 buf: &[u8],
321 ) -> Poll<Result<usize, std::io::Error>> {
322 if buf.is_empty() {
323 return Poll::Ready(Ok(0));
324 }
325 self.send_stream.reserve_capacity(buf.len());
326
327 let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
330 None => Some(0),
331 Some(Ok(cnt)) => self
332 .send_stream
333 .write(&buf[..cnt], false)
334 .ok()
335 .map(|()| cnt),
336 Some(Err(_)) => None,
337 };
338
339 if let Some(cnt) = cnt {
340 return Poll::Ready(Ok(cnt));
341 }
342
343 Poll::Ready(Err(h2_to_io_error(
344 match ready!(self.send_stream.poll_reset(cx)) {
345 Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
346 return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
347 }
348 Ok(reason) => reason.into(),
349 Err(e) => e,
350 },
351 )))
352 }
353
354 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
355 Poll::Ready(Ok(()))
356 }
357
358 fn poll_shutdown(
359 mut self: Pin<&mut Self>,
360 cx: &mut Context<'_>,
361 ) -> Poll<Result<(), std::io::Error>> {
362 if self.send_stream.write(&[], true).is_ok() {
363 return Poll::Ready(Ok(()));
364 }
365
366 Poll::Ready(Err(h2_to_io_error(
367 match ready!(self.send_stream.poll_reset(cx)) {
368 Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
369 Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
370 return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
371 }
372 Ok(reason) => reason.into(),
373 Err(e) => e,
374 },
375 )))
376 }
377}
378
379fn h2_to_io_error(e: h2::Error) -> std::io::Error {
380 if e.is_io() {
381 e.into_io().unwrap()
382 } else {
383 std::io::Error::new(std::io::ErrorKind::Other, e)
384 }
385}
386
387struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
388
389impl<B> UpgradedSendStream<B>
390where
391 B: Buf,
392{
393 unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
394 assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
395 Self(mem::transmute(inner))
396 }
397
398 fn reserve_capacity(&mut self, cnt: usize) {
399 unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
400 }
401
402 fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
403 unsafe { self.as_inner_unchecked().poll_capacity(cx) }
404 }
405
406 fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
407 unsafe { self.as_inner_unchecked().poll_reset(cx) }
408 }
409
410 fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), std::io::Error> {
411 let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
412 unsafe {
413 self.as_inner_unchecked()
414 .send_data(send_buf, end_of_stream)
415 .map_err(h2_to_io_error)
416 }
417 }
418
419 unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
420 &mut *(&mut self.0 as *mut _ as *mut _)
421 }
422}
423
424#[repr(transparent)]
425struct Neutered<B> {
426 _inner: B,
427 impossible: Impossible,
428}
429
430enum Impossible {}
431
432unsafe impl<B> Send for Neutered<B> {}
433
434impl<B> Buf for Neutered<B> {
435 fn remaining(&self) -> usize {
436 match self.impossible {}
437 }
438
439 fn chunk(&self) -> &[u8] {
440 match self.impossible {}
441 }
442
443 fn advance(&mut self, _cnt: usize) {
444 match self.impossible {}
445 }
446}