hyper/proto/h1/
conn.rs

1use std::fmt;
2#[cfg(feature = "server")]
3use std::future::Future;
4use std::io;
5use std::marker::{PhantomData, Unpin};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8#[cfg(feature = "server")]
9use std::time::{Duration, Instant};
10
11use crate::rt::{Read, Write};
12use bytes::{Buf, Bytes};
13use futures_util::ready;
14use http::header::{HeaderValue, CONNECTION, TE};
15use http::{HeaderMap, Method, Version};
16use http_body::Frame;
17use httparse::ParserConfig;
18
19use super::io::Buffered;
20use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21use crate::body::DecodedLength;
22#[cfg(feature = "server")]
23use crate::common::time::Time;
24use crate::headers;
25use crate::proto::{BodyLength, MessageHead};
26#[cfg(feature = "server")]
27use crate::rt::Sleep;
28
29const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
30
31/// This handles a connection, which will have been established over an
32/// `Read + Write` (like a socket), and will likely include multiple
33/// `Transaction`s over HTTP.
34///
35/// The connection will determine when a message begins and ends as well as
36/// determine if this connection can be kept alive after the message,
37/// or if it is complete.
38pub(crate) struct Conn<I, B, T> {
39    io: Buffered<I, EncodedBuf<B>>,
40    state: State,
41    _marker: PhantomData<fn(T)>,
42}
43
44impl<I, B, T> Conn<I, B, T>
45where
46    I: Read + Write + Unpin,
47    B: Buf,
48    T: Http1Transaction,
49{
50    pub(crate) fn new(io: I) -> Conn<I, B, T> {
51        Conn {
52            io: Buffered::new(io),
53            state: State {
54                allow_half_close: false,
55                cached_headers: None,
56                error: None,
57                keep_alive: KA::Busy,
58                method: None,
59                h1_parser_config: ParserConfig::default(),
60                h1_max_headers: None,
61                #[cfg(feature = "server")]
62                h1_header_read_timeout: None,
63                #[cfg(feature = "server")]
64                h1_header_read_timeout_fut: None,
65                #[cfg(feature = "server")]
66                h1_header_read_timeout_running: false,
67                #[cfg(feature = "server")]
68                date_header: true,
69                #[cfg(feature = "server")]
70                timer: Time::Empty,
71                preserve_header_case: false,
72                #[cfg(feature = "ffi")]
73                preserve_header_order: false,
74                title_case_headers: false,
75                h09_responses: false,
76                #[cfg(feature = "client")]
77                on_informational: None,
78                notify_read: false,
79                reading: Reading::Init,
80                writing: Writing::Init,
81                upgrade: None,
82                // We assume a modern world where the remote speaks HTTP/1.1.
83                // If they tell us otherwise, we'll downgrade in `read_head`.
84                version: Version::HTTP_11,
85                allow_trailer_fields: false,
86            },
87            _marker: PhantomData,
88        }
89    }
90
91    #[cfg(feature = "server")]
92    pub(crate) fn set_timer(&mut self, timer: Time) {
93        self.state.timer = timer;
94    }
95
96    #[cfg(feature = "server")]
97    pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
98        self.io.set_flush_pipeline(enabled);
99    }
100
101    pub(crate) fn set_write_strategy_queue(&mut self) {
102        self.io.set_write_strategy_queue();
103    }
104
105    pub(crate) fn set_max_buf_size(&mut self, max: usize) {
106        self.io.set_max_buf_size(max);
107    }
108
109    #[cfg(feature = "client")]
110    pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
111        self.io.set_read_buf_exact_size(sz);
112    }
113
114    pub(crate) fn set_write_strategy_flatten(&mut self) {
115        self.io.set_write_strategy_flatten();
116    }
117
118    pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
119        self.state.h1_parser_config = parser_config;
120    }
121
122    pub(crate) fn set_title_case_headers(&mut self) {
123        self.state.title_case_headers = true;
124    }
125
126    pub(crate) fn set_preserve_header_case(&mut self) {
127        self.state.preserve_header_case = true;
128    }
129
130    #[cfg(feature = "ffi")]
131    pub(crate) fn set_preserve_header_order(&mut self) {
132        self.state.preserve_header_order = true;
133    }
134
135    #[cfg(feature = "client")]
136    pub(crate) fn set_h09_responses(&mut self) {
137        self.state.h09_responses = true;
138    }
139
140    pub(crate) fn set_http1_max_headers(&mut self, val: usize) {
141        self.state.h1_max_headers = Some(val);
142    }
143
144    #[cfg(feature = "server")]
145    pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
146        self.state.h1_header_read_timeout = Some(val);
147    }
148
149    #[cfg(feature = "server")]
150    pub(crate) fn set_allow_half_close(&mut self) {
151        self.state.allow_half_close = true;
152    }
153
154    #[cfg(feature = "server")]
155    pub(crate) fn disable_date_header(&mut self) {
156        self.state.date_header = false;
157    }
158
159    pub(crate) fn into_inner(self) -> (I, Bytes) {
160        self.io.into_inner()
161    }
162
163    pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
164        self.state.upgrade.take()
165    }
166
167    pub(crate) fn is_read_closed(&self) -> bool {
168        self.state.is_read_closed()
169    }
170
171    pub(crate) fn is_write_closed(&self) -> bool {
172        self.state.is_write_closed()
173    }
174
175    pub(crate) fn can_read_head(&self) -> bool {
176        if !matches!(self.state.reading, Reading::Init) {
177            return false;
178        }
179
180        if T::should_read_first() {
181            return true;
182        }
183
184        !matches!(self.state.writing, Writing::Init)
185    }
186
187    pub(crate) fn can_read_body(&self) -> bool {
188        matches!(
189            self.state.reading,
190            Reading::Body(..) | Reading::Continue(..)
191        )
192    }
193
194    #[cfg(feature = "server")]
195    pub(crate) fn has_initial_read_write_state(&self) -> bool {
196        matches!(self.state.reading, Reading::Init)
197            && matches!(self.state.writing, Writing::Init)
198            && self.io.read_buf().is_empty()
199    }
200
201    fn should_error_on_eof(&self) -> bool {
202        // If we're idle, it's probably just the connection closing gracefully.
203        T::should_error_on_parse_eof() && !self.state.is_idle()
204    }
205
206    fn has_h2_prefix(&self) -> bool {
207        let read_buf = self.io.read_buf();
208        read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
209    }
210
211    pub(super) fn poll_read_head(
212        &mut self,
213        cx: &mut Context<'_>,
214    ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
215        debug_assert!(self.can_read_head());
216        trace!("Conn::read_head");
217
218        #[cfg(feature = "server")]
219        if !self.state.h1_header_read_timeout_running {
220            if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
221                let deadline = Instant::now() + h1_header_read_timeout;
222                self.state.h1_header_read_timeout_running = true;
223                match self.state.h1_header_read_timeout_fut {
224                    Some(ref mut h1_header_read_timeout_fut) => {
225                        trace!("resetting h1 header read timeout timer");
226                        self.state.timer.reset(h1_header_read_timeout_fut, deadline);
227                    }
228                    None => {
229                        trace!("setting h1 header read timeout timer");
230                        self.state.h1_header_read_timeout_fut =
231                            Some(self.state.timer.sleep_until(deadline));
232                    }
233                }
234            }
235        }
236
237        let msg = match self.io.parse::<T>(
238            cx,
239            ParseContext {
240                cached_headers: &mut self.state.cached_headers,
241                req_method: &mut self.state.method,
242                h1_parser_config: self.state.h1_parser_config.clone(),
243                h1_max_headers: self.state.h1_max_headers,
244                preserve_header_case: self.state.preserve_header_case,
245                #[cfg(feature = "ffi")]
246                preserve_header_order: self.state.preserve_header_order,
247                h09_responses: self.state.h09_responses,
248                #[cfg(feature = "client")]
249                on_informational: &mut self.state.on_informational,
250            },
251        ) {
252            Poll::Ready(Ok(msg)) => msg,
253            Poll::Ready(Err(e)) => return self.on_read_head_error(e),
254            Poll::Pending => {
255                #[cfg(feature = "server")]
256                if self.state.h1_header_read_timeout_running {
257                    if let Some(ref mut h1_header_read_timeout_fut) =
258                        self.state.h1_header_read_timeout_fut
259                    {
260                        if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
261                            self.state.h1_header_read_timeout_running = false;
262
263                            warn!("read header from client timeout");
264                            return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
265                        }
266                    }
267                }
268
269                return Poll::Pending;
270            }
271        };
272
273        #[cfg(feature = "server")]
274        {
275            self.state.h1_header_read_timeout_running = false;
276            self.state.h1_header_read_timeout_fut = None;
277        }
278
279        // Note: don't deconstruct `msg` into local variables, it appears
280        // the optimizer doesn't remove the extra copies.
281
282        debug!("incoming body is {}", msg.decode);
283
284        // Prevent accepting HTTP/0.9 responses after the initial one, if any.
285        self.state.h09_responses = false;
286
287        // Drop any OnInformational callbacks, we're done there!
288        #[cfg(feature = "client")]
289        {
290            self.state.on_informational = None;
291        }
292
293        self.state.busy();
294        self.state.keep_alive &= msg.keep_alive;
295        self.state.version = msg.head.version;
296
297        let mut wants = if msg.wants_upgrade {
298            Wants::UPGRADE
299        } else {
300            Wants::EMPTY
301        };
302
303        if msg.decode == DecodedLength::ZERO {
304            if msg.expect_continue {
305                debug!("ignoring expect-continue since body is empty");
306            }
307            self.state.reading = Reading::KeepAlive;
308            if !T::should_read_first() {
309                self.try_keep_alive(cx);
310            }
311        } else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
312            let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
313            self.state.reading = Reading::Continue(Decoder::new(
314                msg.decode,
315                self.state.h1_max_headers,
316                h1_max_header_size,
317            ));
318            wants = wants.add(Wants::EXPECT);
319        } else {
320            let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
321            self.state.reading = Reading::Body(Decoder::new(
322                msg.decode,
323                self.state.h1_max_headers,
324                h1_max_header_size,
325            ));
326        }
327
328        self.state.allow_trailer_fields = msg
329            .head
330            .headers
331            .get(TE)
332            .map_or(false, |te_header| te_header == "trailers");
333
334        Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
335    }
336
337    fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
338        // If we are currently waiting on a message, then an empty
339        // message should be reported as an error. If not, it is just
340        // the connection closing gracefully.
341        let must_error = self.should_error_on_eof();
342        self.close_read();
343        self.io.consume_leading_lines();
344        let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
345        if was_mid_parse || must_error {
346            // We check if the buf contains the h2 Preface
347            debug!(
348                "parse error ({}) with {} bytes",
349                e,
350                self.io.read_buf().len()
351            );
352            match self.on_parse_error(e) {
353                Ok(()) => Poll::Pending, // XXX: wat?
354                Err(e) => Poll::Ready(Some(Err(e))),
355            }
356        } else {
357            debug!("read eof");
358            self.close_write();
359            Poll::Ready(None)
360        }
361    }
362
363    pub(crate) fn poll_read_body(
364        &mut self,
365        cx: &mut Context<'_>,
366    ) -> Poll<Option<io::Result<Frame<Bytes>>>> {
367        debug_assert!(self.can_read_body());
368
369        let (reading, ret) = match self.state.reading {
370            Reading::Body(ref mut decoder) => {
371                match ready!(decoder.decode(cx, &mut self.io)) {
372                    Ok(frame) => {
373                        if frame.is_data() {
374                            let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
375                            let (reading, maybe_frame) = if decoder.is_eof() {
376                                debug!("incoming body completed");
377                                (
378                                    Reading::KeepAlive,
379                                    if !slice.is_empty() {
380                                        Some(Ok(frame))
381                                    } else {
382                                        None
383                                    },
384                                )
385                            } else if slice.is_empty() {
386                                error!("incoming body unexpectedly ended");
387                                // This should be unreachable, since all 3 decoders
388                                // either set eof=true or return an Err when reading
389                                // an empty slice...
390                                (Reading::Closed, None)
391                            } else {
392                                return Poll::Ready(Some(Ok(frame)));
393                            };
394                            (reading, Poll::Ready(maybe_frame))
395                        } else if frame.is_trailers() {
396                            (Reading::Closed, Poll::Ready(Some(Ok(frame))))
397                        } else {
398                            trace!("discarding unknown frame");
399                            (Reading::Closed, Poll::Ready(None))
400                        }
401                    }
402                    Err(e) => {
403                        debug!("incoming body decode error: {}", e);
404                        (Reading::Closed, Poll::Ready(Some(Err(e))))
405                    }
406                }
407            }
408            Reading::Continue(ref decoder) => {
409                // Write the 100 Continue if not already responded...
410                if let Writing::Init = self.state.writing {
411                    trace!("automatically sending 100 Continue");
412                    let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
413                    self.io.headers_buf().extend_from_slice(cont);
414                }
415
416                // And now recurse once in the Reading::Body state...
417                self.state.reading = Reading::Body(decoder.clone());
418                return self.poll_read_body(cx);
419            }
420            _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
421        };
422
423        self.state.reading = reading;
424        self.try_keep_alive(cx);
425        ret
426    }
427
428    pub(crate) fn wants_read_again(&mut self) -> bool {
429        let ret = self.state.notify_read;
430        self.state.notify_read = false;
431        ret
432    }
433
434    pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
435        debug_assert!(!self.can_read_head() && !self.can_read_body());
436
437        if self.is_read_closed() {
438            Poll::Pending
439        } else if self.is_mid_message() {
440            self.mid_message_detect_eof(cx)
441        } else {
442            self.require_empty_read(cx)
443        }
444    }
445
446    fn is_mid_message(&self) -> bool {
447        !matches!(
448            (&self.state.reading, &self.state.writing),
449            (&Reading::Init, &Writing::Init)
450        )
451    }
452
453    // This will check to make sure the io object read is empty.
454    //
455    // This should only be called for Clients wanting to enter the idle
456    // state.
457    fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
458        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
459        debug_assert!(!self.is_mid_message());
460        debug_assert!(T::is_client());
461
462        if !self.io.read_buf().is_empty() {
463            debug!("received an unexpected {} bytes", self.io.read_buf().len());
464            return Poll::Ready(Err(crate::Error::new_unexpected_message()));
465        }
466
467        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
468
469        if num_read == 0 {
470            let ret = if self.should_error_on_eof() {
471                trace!("found unexpected EOF on busy connection: {:?}", self.state);
472                Poll::Ready(Err(crate::Error::new_incomplete()))
473            } else {
474                trace!("found EOF on idle connection, closing");
475                Poll::Ready(Ok(()))
476            };
477
478            // order is important: should_error needs state BEFORE close_read
479            self.state.close_read();
480            return ret;
481        }
482
483        debug!(
484            "received unexpected {} bytes on an idle connection",
485            num_read
486        );
487        Poll::Ready(Err(crate::Error::new_unexpected_message()))
488    }
489
490    fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
491        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
492        debug_assert!(self.is_mid_message());
493
494        if self.state.allow_half_close || !self.io.read_buf().is_empty() {
495            return Poll::Pending;
496        }
497
498        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
499
500        if num_read == 0 {
501            trace!("found unexpected EOF on busy connection: {:?}", self.state);
502            self.state.close_read();
503            Poll::Ready(Err(crate::Error::new_incomplete()))
504        } else {
505            Poll::Ready(Ok(()))
506        }
507    }
508
509    fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
510        debug_assert!(!self.state.is_read_closed());
511
512        let result = ready!(self.io.poll_read_from_io(cx));
513        Poll::Ready(result.map_err(|e| {
514            trace!(error = %e, "force_io_read; io error");
515            self.state.close();
516            e
517        }))
518    }
519
520    fn maybe_notify(&mut self, cx: &mut Context<'_>) {
521        // its possible that we returned NotReady from poll() without having
522        // exhausted the underlying Io. We would have done this when we
523        // determined we couldn't keep reading until we knew how writing
524        // would finish.
525
526        match self.state.reading {
527            Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
528                return
529            }
530            Reading::Init => (),
531        };
532
533        match self.state.writing {
534            Writing::Body(..) => return,
535            Writing::Init | Writing::KeepAlive | Writing::Closed => (),
536        }
537
538        if !self.io.is_read_blocked() {
539            if self.io.read_buf().is_empty() {
540                match self.io.poll_read_from_io(cx) {
541                    Poll::Ready(Ok(n)) => {
542                        if n == 0 {
543                            trace!("maybe_notify; read eof");
544                            if self.state.is_idle() {
545                                self.state.close();
546                            } else {
547                                self.close_read()
548                            }
549                            return;
550                        }
551                    }
552                    Poll::Pending => {
553                        trace!("maybe_notify; read_from_io blocked");
554                        return;
555                    }
556                    Poll::Ready(Err(e)) => {
557                        trace!("maybe_notify; read_from_io error: {}", e);
558                        self.state.close();
559                        self.state.error = Some(crate::Error::new_io(e));
560                    }
561                }
562            }
563            self.state.notify_read = true;
564        }
565    }
566
567    fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
568        self.state.try_keep_alive::<T>();
569        self.maybe_notify(cx);
570    }
571
572    pub(crate) fn can_write_head(&self) -> bool {
573        if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
574            return false;
575        }
576
577        match self.state.writing {
578            Writing::Init => self.io.can_headers_buf(),
579            _ => false,
580        }
581    }
582
583    pub(crate) fn can_write_body(&self) -> bool {
584        match self.state.writing {
585            Writing::Body(..) => true,
586            Writing::Init | Writing::KeepAlive | Writing::Closed => false,
587        }
588    }
589
590    pub(crate) fn can_buffer_body(&self) -> bool {
591        self.io.can_buffer()
592    }
593
594    pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
595        if let Some(encoder) = self.encode_head(head, body) {
596            self.state.writing = if !encoder.is_eof() {
597                Writing::Body(encoder)
598            } else if encoder.is_last() {
599                Writing::Closed
600            } else {
601                Writing::KeepAlive
602            };
603        }
604    }
605
606    fn encode_head(
607        &mut self,
608        mut head: MessageHead<T::Outgoing>,
609        body: Option<BodyLength>,
610    ) -> Option<Encoder> {
611        debug_assert!(self.can_write_head());
612
613        if !T::should_read_first() {
614            self.state.busy();
615        }
616
617        self.enforce_version(&mut head);
618
619        let buf = self.io.headers_buf();
620        match super::role::encode_headers::<T>(
621            Encode {
622                head: &mut head,
623                body,
624                #[cfg(feature = "server")]
625                keep_alive: self.state.wants_keep_alive(),
626                req_method: &mut self.state.method,
627                title_case_headers: self.state.title_case_headers,
628                #[cfg(feature = "server")]
629                date_header: self.state.date_header,
630            },
631            buf,
632        ) {
633            Ok(encoder) => {
634                debug_assert!(self.state.cached_headers.is_none());
635                debug_assert!(head.headers.is_empty());
636                self.state.cached_headers = Some(head.headers);
637
638                #[cfg(feature = "client")]
639                {
640                    self.state.on_informational =
641                        head.extensions.remove::<crate::ext::OnInformational>();
642                }
643
644                Some(encoder)
645            }
646            Err(err) => {
647                self.state.error = Some(err);
648                self.state.writing = Writing::Closed;
649                None
650            }
651        }
652    }
653
654    // Fix keep-alive when Connection: keep-alive header is not present
655    fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
656        let outgoing_is_keep_alive = head
657            .headers
658            .get(CONNECTION)
659            .map_or(false, headers::connection_keep_alive);
660
661        if !outgoing_is_keep_alive {
662            match head.version {
663                // If response is version 1.0 and keep-alive is not present in the response,
664                // disable keep-alive so the server closes the connection
665                Version::HTTP_10 => self.state.disable_keep_alive(),
666                // If response is version 1.1 and keep-alive is wanted, add
667                // Connection: keep-alive header when not present
668                Version::HTTP_11 => {
669                    if self.state.wants_keep_alive() {
670                        head.headers
671                            .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
672                    }
673                }
674                _ => (),
675            }
676        }
677    }
678
679    // If we know the remote speaks an older version, we try to fix up any messages
680    // to work with our older peer.
681    fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
682        match self.state.version {
683            Version::HTTP_10 => {
684                // Fixes response or connection when keep-alive header is not present
685                self.fix_keep_alive(head);
686                // If the remote only knows HTTP/1.0, we should force ourselves
687                // to do only speak HTTP/1.0 as well.
688                head.version = Version::HTTP_10;
689            }
690            Version::HTTP_11 => {
691                if let KA::Disabled = self.state.keep_alive.status() {
692                    head.headers
693                        .insert(CONNECTION, HeaderValue::from_static("close"));
694                }
695            }
696            _ => (),
697        }
698        // If the remote speaks HTTP/1.1, then it *should* be fine with
699        // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
700        // the user's headers be.
701    }
702
703    pub(crate) fn write_body(&mut self, chunk: B) {
704        debug_assert!(self.can_write_body() && self.can_buffer_body());
705        // empty chunks should be discarded at Dispatcher level
706        debug_assert!(chunk.remaining() != 0);
707
708        let state = match self.state.writing {
709            Writing::Body(ref mut encoder) => {
710                self.io.buffer(encoder.encode(chunk));
711
712                if !encoder.is_eof() {
713                    return;
714                }
715
716                if encoder.is_last() {
717                    Writing::Closed
718                } else {
719                    Writing::KeepAlive
720                }
721            }
722            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
723        };
724
725        self.state.writing = state;
726    }
727
728    pub(crate) fn write_trailers(&mut self, trailers: HeaderMap) {
729        if T::is_server() && !self.state.allow_trailer_fields {
730            debug!("trailers not allowed to be sent");
731            return;
732        }
733        debug_assert!(self.can_write_body() && self.can_buffer_body());
734
735        match self.state.writing {
736            Writing::Body(ref encoder) => {
737                if let Some(enc_buf) =
738                    encoder.encode_trailers(trailers, self.state.title_case_headers)
739                {
740                    self.io.buffer(enc_buf);
741
742                    self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
743                        Writing::Closed
744                    } else {
745                        Writing::KeepAlive
746                    };
747                }
748            }
749            _ => unreachable!("write_trailers invalid state: {:?}", self.state.writing),
750        }
751    }
752
753    pub(crate) fn write_body_and_end(&mut self, chunk: B) {
754        debug_assert!(self.can_write_body() && self.can_buffer_body());
755        // empty chunks should be discarded at Dispatcher level
756        debug_assert!(chunk.remaining() != 0);
757
758        let state = match self.state.writing {
759            Writing::Body(ref encoder) => {
760                let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
761                if can_keep_alive {
762                    Writing::KeepAlive
763                } else {
764                    Writing::Closed
765                }
766            }
767            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
768        };
769
770        self.state.writing = state;
771    }
772
773    pub(crate) fn end_body(&mut self) -> crate::Result<()> {
774        debug_assert!(self.can_write_body());
775
776        let encoder = match self.state.writing {
777            Writing::Body(ref mut enc) => enc,
778            _ => return Ok(()),
779        };
780
781        // end of stream, that means we should try to eof
782        match encoder.end() {
783            Ok(end) => {
784                if let Some(end) = end {
785                    self.io.buffer(end);
786                }
787
788                self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
789                    Writing::Closed
790                } else {
791                    Writing::KeepAlive
792                };
793
794                Ok(())
795            }
796            Err(not_eof) => {
797                self.state.writing = Writing::Closed;
798                Err(crate::Error::new_body_write_aborted().with(not_eof))
799            }
800        }
801    }
802
803    // When we get a parse error, depending on what side we are, we might be able
804    // to write a response before closing the connection.
805    //
806    // - Client: there is nothing we can do
807    // - Server: if Response hasn't been written yet, we can send a 4xx response
808    fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
809        if let Writing::Init = self.state.writing {
810            if self.has_h2_prefix() {
811                return Err(crate::Error::new_version_h2());
812            }
813            if let Some(msg) = T::on_error(&err) {
814                // Drop the cached headers so as to not trigger a debug
815                // assert in `write_head`...
816                self.state.cached_headers.take();
817                self.write_head(msg, None);
818                self.state.error = Some(err);
819                return Ok(());
820            }
821        }
822
823        // fallback is pass the error back up
824        Err(err)
825    }
826
827    pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
828        ready!(Pin::new(&mut self.io).poll_flush(cx))?;
829        self.try_keep_alive(cx);
830        trace!("flushed({}): {:?}", T::LOG, self.state);
831        Poll::Ready(Ok(()))
832    }
833
834    pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
835        match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
836            Ok(()) => {
837                trace!("shut down IO complete");
838                Poll::Ready(Ok(()))
839            }
840            Err(e) => {
841                debug!("error shutting down IO: {}", e);
842                Poll::Ready(Err(e))
843            }
844        }
845    }
846
847    /// If the read side can be cheaply drained, do so. Otherwise, close.
848    pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
849        if let Reading::Continue(ref decoder) = self.state.reading {
850            // skip sending the 100-continue
851            // just move forward to a read, in case a tiny body was included
852            self.state.reading = Reading::Body(decoder.clone());
853        }
854
855        let _ = self.poll_read_body(cx);
856
857        // If still in Reading::Body, just give up
858        match self.state.reading {
859            Reading::Init | Reading::KeepAlive => {
860                trace!("body drained")
861            }
862            _ => self.close_read(),
863        }
864    }
865
866    pub(crate) fn close_read(&mut self) {
867        self.state.close_read();
868    }
869
870    pub(crate) fn close_write(&mut self) {
871        self.state.close_write();
872    }
873
874    #[cfg(feature = "server")]
875    pub(crate) fn disable_keep_alive(&mut self) {
876        if self.state.is_idle() {
877            trace!("disable_keep_alive; closing idle connection");
878            self.state.close();
879        } else {
880            trace!("disable_keep_alive; in-progress connection");
881            self.state.disable_keep_alive();
882        }
883    }
884
885    pub(crate) fn take_error(&mut self) -> crate::Result<()> {
886        if let Some(err) = self.state.error.take() {
887            Err(err)
888        } else {
889            Ok(())
890        }
891    }
892
893    pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
894        trace!("{}: prepare possible HTTP upgrade", T::LOG);
895        self.state.prepare_upgrade()
896    }
897}
898
899impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
900    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
901        f.debug_struct("Conn")
902            .field("state", &self.state)
903            .field("io", &self.io)
904            .finish()
905    }
906}
907
908// B and T are never pinned
909impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
910
911struct State {
912    allow_half_close: bool,
913    /// Re-usable HeaderMap to reduce allocating new ones.
914    cached_headers: Option<HeaderMap>,
915    /// If an error occurs when there wasn't a direct way to return it
916    /// back to the user, this is set.
917    error: Option<crate::Error>,
918    /// Current keep-alive status.
919    keep_alive: KA,
920    /// If mid-message, the HTTP Method that started it.
921    ///
922    /// This is used to know things such as if the message can include
923    /// a body or not.
924    method: Option<Method>,
925    h1_parser_config: ParserConfig,
926    h1_max_headers: Option<usize>,
927    #[cfg(feature = "server")]
928    h1_header_read_timeout: Option<Duration>,
929    #[cfg(feature = "server")]
930    h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
931    #[cfg(feature = "server")]
932    h1_header_read_timeout_running: bool,
933    #[cfg(feature = "server")]
934    date_header: bool,
935    #[cfg(feature = "server")]
936    timer: Time,
937    preserve_header_case: bool,
938    #[cfg(feature = "ffi")]
939    preserve_header_order: bool,
940    title_case_headers: bool,
941    h09_responses: bool,
942    /// If set, called with each 1xx informational response received for
943    /// the current request. MUST be unset after a non-1xx response is
944    /// received.
945    #[cfg(feature = "client")]
946    on_informational: Option<crate::ext::OnInformational>,
947    /// Set to true when the Dispatcher should poll read operations
948    /// again. See the `maybe_notify` method for more.
949    notify_read: bool,
950    /// State of allowed reads
951    reading: Reading,
952    /// State of allowed writes
953    writing: Writing,
954    /// An expected pending HTTP upgrade.
955    upgrade: Option<crate::upgrade::Pending>,
956    /// Either HTTP/1.0 or 1.1 connection
957    version: Version,
958    /// Flag to track if trailer fields are allowed to be sent
959    allow_trailer_fields: bool,
960}
961
962#[derive(Debug)]
963enum Reading {
964    Init,
965    Continue(Decoder),
966    Body(Decoder),
967    KeepAlive,
968    Closed,
969}
970
971enum Writing {
972    Init,
973    Body(Encoder),
974    KeepAlive,
975    Closed,
976}
977
978impl fmt::Debug for State {
979    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
980        let mut builder = f.debug_struct("State");
981        builder
982            .field("reading", &self.reading)
983            .field("writing", &self.writing)
984            .field("keep_alive", &self.keep_alive);
985
986        // Only show error field if it's interesting...
987        if let Some(ref error) = self.error {
988            builder.field("error", error);
989        }
990
991        if self.allow_half_close {
992            builder.field("allow_half_close", &true);
993        }
994
995        // Purposefully leaving off other fields..
996
997        builder.finish()
998    }
999}
1000
1001impl fmt::Debug for Writing {
1002    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1003        match *self {
1004            Writing::Init => f.write_str("Init"),
1005            Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
1006            Writing::KeepAlive => f.write_str("KeepAlive"),
1007            Writing::Closed => f.write_str("Closed"),
1008        }
1009    }
1010}
1011
1012impl std::ops::BitAndAssign<bool> for KA {
1013    fn bitand_assign(&mut self, enabled: bool) {
1014        if !enabled {
1015            trace!("remote disabling keep-alive");
1016            *self = KA::Disabled;
1017        }
1018    }
1019}
1020
1021#[derive(Clone, Copy, Debug, Default)]
1022enum KA {
1023    Idle,
1024    #[default]
1025    Busy,
1026    Disabled,
1027}
1028
1029impl KA {
1030    fn idle(&mut self) {
1031        *self = KA::Idle;
1032    }
1033
1034    fn busy(&mut self) {
1035        *self = KA::Busy;
1036    }
1037
1038    fn disable(&mut self) {
1039        *self = KA::Disabled;
1040    }
1041
1042    fn status(&self) -> KA {
1043        *self
1044    }
1045}
1046
1047impl State {
1048    fn close(&mut self) {
1049        trace!("State::close()");
1050        self.reading = Reading::Closed;
1051        self.writing = Writing::Closed;
1052        self.keep_alive.disable();
1053    }
1054
1055    fn close_read(&mut self) {
1056        trace!("State::close_read()");
1057        self.reading = Reading::Closed;
1058        self.keep_alive.disable();
1059    }
1060
1061    fn close_write(&mut self) {
1062        trace!("State::close_write()");
1063        self.writing = Writing::Closed;
1064        self.keep_alive.disable();
1065    }
1066
1067    fn wants_keep_alive(&self) -> bool {
1068        !matches!(self.keep_alive.status(), KA::Disabled)
1069    }
1070
1071    fn try_keep_alive<T: Http1Transaction>(&mut self) {
1072        match (&self.reading, &self.writing) {
1073            (&Reading::KeepAlive, &Writing::KeepAlive) => {
1074                if let KA::Busy = self.keep_alive.status() {
1075                    self.idle::<T>();
1076                } else {
1077                    trace!(
1078                        "try_keep_alive({}): could keep-alive, but status = {:?}",
1079                        T::LOG,
1080                        self.keep_alive
1081                    );
1082                    self.close();
1083                }
1084            }
1085            (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
1086                self.close()
1087            }
1088            _ => (),
1089        }
1090    }
1091
1092    fn disable_keep_alive(&mut self) {
1093        self.keep_alive.disable()
1094    }
1095
1096    fn busy(&mut self) {
1097        if let KA::Disabled = self.keep_alive.status() {
1098            return;
1099        }
1100        self.keep_alive.busy();
1101    }
1102
1103    fn idle<T: Http1Transaction>(&mut self) {
1104        debug_assert!(!self.is_idle(), "State::idle() called while idle");
1105
1106        self.method = None;
1107        self.keep_alive.idle();
1108
1109        if !self.is_idle() {
1110            self.close();
1111            return;
1112        }
1113
1114        self.reading = Reading::Init;
1115        self.writing = Writing::Init;
1116
1117        // !T::should_read_first() means Client.
1118        //
1119        // If Client connection has just gone idle, the Dispatcher
1120        // should try the poll loop one more time, so as to poll the
1121        // pending requests stream.
1122        if !T::should_read_first() {
1123            self.notify_read = true;
1124        }
1125
1126        #[cfg(feature = "server")]
1127        if self.h1_header_read_timeout.is_some() {
1128            // Next read will start and poll the header read timeout,
1129            // so we can close the connection if another header isn't
1130            // received in a timely manner.
1131            self.notify_read = true;
1132        }
1133    }
1134
1135    fn is_idle(&self) -> bool {
1136        matches!(self.keep_alive.status(), KA::Idle)
1137    }
1138
1139    fn is_read_closed(&self) -> bool {
1140        matches!(self.reading, Reading::Closed)
1141    }
1142
1143    fn is_write_closed(&self) -> bool {
1144        matches!(self.writing, Writing::Closed)
1145    }
1146
1147    fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1148        let (tx, rx) = crate::upgrade::pending();
1149        self.upgrade = Some(tx);
1150        rx
1151    }
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156    #[cfg(all(feature = "nightly", not(miri)))]
1157    #[bench]
1158    fn bench_read_head_short(b: &mut ::test::Bencher) {
1159        use super::*;
1160        use crate::common::io::Compat;
1161        let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1162        let len = s.len();
1163        b.bytes = len as u64;
1164
1165        // an empty IO, we'll be skipping and using the read buffer anyways
1166        let io = Compat(tokio_test::io::Builder::new().build());
1167        let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1168        *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1169        conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1170
1171        let rt = tokio::runtime::Builder::new_current_thread()
1172            .enable_all()
1173            .build()
1174            .unwrap();
1175
1176        b.iter(|| {
1177            rt.block_on(futures_util::future::poll_fn(|cx| {
1178                match conn.poll_read_head(cx) {
1179                    Poll::Ready(Some(Ok(x))) => {
1180                        ::test::black_box(&x);
1181                        let mut headers = x.0.headers;
1182                        headers.clear();
1183                        conn.state.cached_headers = Some(headers);
1184                    }
1185                    f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1186                }
1187
1188                conn.io.read_buf_mut().reserve(1);
1189                unsafe {
1190                    conn.io.read_buf_mut().set_len(len);
1191                }
1192                conn.state.reading = Reading::Init;
1193                Poll::Ready(())
1194            }));
1195        });
1196    }
1197
1198    /*
1199    //TODO: rewrite these using dispatch... someday...
1200    use futures::{Async, Future, Stream, Sink};
1201    use futures::future;
1202
1203    use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1204    use super::super::Encoder;
1205    use mock::AsyncIo;
1206
1207    use super::{Conn, Decoder, Reading, Writing};
1208    use ::uri::Uri;
1209
1210    use std::str::FromStr;
1211
1212    #[test]
1213    fn test_conn_init_read() {
1214        let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1215        let len = good_message.len();
1216        let io = AsyncIo::new_buf(good_message, len);
1217        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1218
1219        match conn.poll().unwrap() {
1220            Async::Ready(Some(Frame::Message { message, body: false })) => {
1221                assert_eq!(message, MessageHead {
1222                    subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1223                    .. MessageHead::default()
1224                })
1225            },
1226            f => panic!("frame is not Frame::Message: {:?}", f)
1227        }
1228    }
1229
1230    #[test]
1231    fn test_conn_parse_partial() {
1232        let _: Result<(), ()> = future::lazy(|| {
1233            let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1234            let io = AsyncIo::new_buf(good_message, 10);
1235            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1236            assert!(conn.poll().unwrap().is_not_ready());
1237            conn.io.io_mut().block_in(50);
1238            let async = conn.poll().unwrap();
1239            assert!(async.is_ready());
1240            match async {
1241                Async::Ready(Some(Frame::Message { .. })) => (),
1242                f => panic!("frame is not Message: {:?}", f),
1243            }
1244            Ok(())
1245        }).wait();
1246    }
1247
1248    #[test]
1249    fn test_conn_init_read_eof_idle() {
1250        let io = AsyncIo::new_buf(vec![], 1);
1251        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1252        conn.state.idle();
1253
1254        match conn.poll().unwrap() {
1255            Async::Ready(None) => {},
1256            other => panic!("frame is not None: {:?}", other)
1257        }
1258    }
1259
1260    #[test]
1261    fn test_conn_init_read_eof_idle_partial_parse() {
1262        let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1263        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1264        conn.state.idle();
1265
1266        match conn.poll() {
1267            Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1268            other => panic!("unexpected frame: {:?}", other)
1269        }
1270    }
1271
1272    #[test]
1273    fn test_conn_init_read_eof_busy() {
1274        let _: Result<(), ()> = future::lazy(|| {
1275            // server ignores
1276            let io = AsyncIo::new_eof();
1277            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1278            conn.state.busy();
1279
1280            match conn.poll().unwrap() {
1281                Async::Ready(None) => {},
1282                other => panic!("unexpected frame: {:?}", other)
1283            }
1284
1285            // client
1286            let io = AsyncIo::new_eof();
1287            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1288            conn.state.busy();
1289
1290            match conn.poll() {
1291                Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1292                other => panic!("unexpected frame: {:?}", other)
1293            }
1294            Ok(())
1295        }).wait();
1296    }
1297
1298    #[test]
1299    fn test_conn_body_finish_read_eof() {
1300        let _: Result<(), ()> = future::lazy(|| {
1301            let io = AsyncIo::new_eof();
1302            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1303            conn.state.busy();
1304            conn.state.writing = Writing::KeepAlive;
1305            conn.state.reading = Reading::Body(Decoder::length(0));
1306
1307            match conn.poll() {
1308                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1309                other => panic!("unexpected frame: {:?}", other)
1310            }
1311
1312            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1313            // the conn eof in this case is perfectly fine
1314
1315            match conn.poll() {
1316                Ok(Async::Ready(None)) => (),
1317                other => panic!("unexpected frame: {:?}", other)
1318            }
1319            Ok(())
1320        }).wait();
1321    }
1322
1323    #[test]
1324    fn test_conn_message_empty_body_read_eof() {
1325        let _: Result<(), ()> = future::lazy(|| {
1326            let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1327            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1328            conn.state.busy();
1329            conn.state.writing = Writing::KeepAlive;
1330
1331            match conn.poll() {
1332                Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1333                other => panic!("unexpected frame: {:?}", other)
1334            }
1335
1336            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1337            // the conn eof in this case is perfectly fine
1338
1339            match conn.poll() {
1340                Ok(Async::Ready(None)) => (),
1341                other => panic!("unexpected frame: {:?}", other)
1342            }
1343            Ok(())
1344        }).wait();
1345    }
1346
1347    #[test]
1348    fn test_conn_read_body_end() {
1349        let _: Result<(), ()> = future::lazy(|| {
1350            let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1351            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1352            conn.state.busy();
1353
1354            match conn.poll() {
1355                Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1356                other => panic!("unexpected frame: {:?}", other)
1357            }
1358
1359            match conn.poll() {
1360                Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1361                other => panic!("unexpected frame: {:?}", other)
1362            }
1363
1364            // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1365            match conn.poll() {
1366                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1367                other => panic!("unexpected frame: {:?}", other)
1368            }
1369
1370            match conn.poll() {
1371                Ok(Async::NotReady) => (),
1372                other => panic!("unexpected frame: {:?}", other)
1373            }
1374            Ok(())
1375        }).wait();
1376    }
1377
1378    #[test]
1379    fn test_conn_closed_read() {
1380        let io = AsyncIo::new_buf(vec![], 0);
1381        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1382        conn.state.close();
1383
1384        match conn.poll().unwrap() {
1385            Async::Ready(None) => {},
1386            other => panic!("frame is not None: {:?}", other)
1387        }
1388    }
1389
1390    #[test]
1391    fn test_conn_body_write_length() {
1392        let _ = pretty_env_logger::try_init();
1393        let _: Result<(), ()> = future::lazy(|| {
1394            let io = AsyncIo::new_buf(vec![], 0);
1395            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1396            let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1397            conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1398
1399            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1400            assert!(!conn.can_buffer_body());
1401
1402            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1403
1404            conn.io.io_mut().block_in(1024 * 3);
1405            assert!(conn.poll_complete().unwrap().is_not_ready());
1406            conn.io.io_mut().block_in(1024 * 3);
1407            assert!(conn.poll_complete().unwrap().is_not_ready());
1408            conn.io.io_mut().block_in(max * 2);
1409            assert!(conn.poll_complete().unwrap().is_ready());
1410
1411            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1412            Ok(())
1413        }).wait();
1414    }
1415
1416    #[test]
1417    fn test_conn_body_write_chunked() {
1418        let _: Result<(), ()> = future::lazy(|| {
1419            let io = AsyncIo::new_buf(vec![], 4096);
1420            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1421            conn.state.writing = Writing::Body(Encoder::chunked());
1422
1423            assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1424            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1425            Ok(())
1426        }).wait();
1427    }
1428
1429    #[test]
1430    fn test_conn_body_flush() {
1431        let _: Result<(), ()> = future::lazy(|| {
1432            let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1433            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1434            conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1435            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1436            assert!(!conn.can_buffer_body());
1437            conn.io.io_mut().block_in(1024 * 1024 * 5);
1438            assert!(conn.poll_complete().unwrap().is_ready());
1439            assert!(conn.can_buffer_body());
1440            assert!(conn.io.io_mut().flushed());
1441
1442            Ok(())
1443        }).wait();
1444    }
1445
1446    #[test]
1447    fn test_conn_parking() {
1448        use std::sync::Arc;
1449        use futures::executor::Notify;
1450        use futures::executor::NotifyHandle;
1451
1452        struct Car {
1453            permit: bool,
1454        }
1455        impl Notify for Car {
1456            fn notify(&self, _id: usize) {
1457                assert!(self.permit, "unparked without permit");
1458            }
1459        }
1460
1461        fn car(permit: bool) -> NotifyHandle {
1462            Arc::new(Car {
1463                permit: permit,
1464            }).into()
1465        }
1466
1467        // test that once writing is done, unparks
1468        let f = future::lazy(|| {
1469            let io = AsyncIo::new_buf(vec![], 4096);
1470            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1471            conn.state.reading = Reading::KeepAlive;
1472            assert!(conn.poll().unwrap().is_not_ready());
1473
1474            conn.state.writing = Writing::KeepAlive;
1475            assert!(conn.poll_complete().unwrap().is_ready());
1476            Ok::<(), ()>(())
1477        });
1478        ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1479
1480
1481        // test that flushing when not waiting on read doesn't unpark
1482        let f = future::lazy(|| {
1483            let io = AsyncIo::new_buf(vec![], 4096);
1484            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1485            conn.state.writing = Writing::KeepAlive;
1486            assert!(conn.poll_complete().unwrap().is_ready());
1487            Ok::<(), ()>(())
1488        });
1489        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1490
1491
1492        // test that flushing and writing isn't done doesn't unpark
1493        let f = future::lazy(|| {
1494            let io = AsyncIo::new_buf(vec![], 4096);
1495            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1496            conn.state.reading = Reading::KeepAlive;
1497            assert!(conn.poll().unwrap().is_not_ready());
1498            conn.state.writing = Writing::Body(Encoder::length(5_000));
1499            assert!(conn.poll_complete().unwrap().is_ready());
1500            Ok::<(), ()>(())
1501        });
1502        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1503    }
1504
1505    #[test]
1506    fn test_conn_closed_write() {
1507        let io = AsyncIo::new_buf(vec![], 0);
1508        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1509        conn.state.close();
1510
1511        match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1512            Err(_e) => {},
1513            other => panic!("did not return Err: {:?}", other)
1514        }
1515
1516        assert!(conn.state.is_write_closed());
1517    }
1518
1519    #[test]
1520    fn test_conn_write_empty_chunk() {
1521        let io = AsyncIo::new_buf(vec![], 0);
1522        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1523        conn.state.writing = Writing::KeepAlive;
1524
1525        assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1526        assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1527        conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1528    }
1529    */
1530}