h2/proto/streams/
recv.rs

1use super::*;
2use crate::codec::UserError;
3use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto;
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::Instant;
12
13#[derive(Debug)]
14pub(super) struct Recv {
15    /// Initial window size of remote initiated streams
16    init_window_sz: WindowSize,
17
18    /// Connection level flow control governing received data
19    flow: FlowControl,
20
21    /// Amount of connection window capacity currently used by outstanding streams.
22    in_flight_data: WindowSize,
23
24    /// The lowest stream ID that is still idle
25    next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27    /// The stream ID of the last processed stream
28    last_processed_id: StreamId,
29
30    /// Any streams with a higher ID are ignored.
31    ///
32    /// This starts as MAX, but is lowered when a GOAWAY is received.
33    ///
34    /// > After sending a GOAWAY frame, the sender can discard frames for
35    /// > streams initiated by the receiver with identifiers higher than
36    /// > the identified last stream.
37    max_stream_id: StreamId,
38
39    /// Streams that have pending window updates
40    pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42    /// New streams to be accepted
43    pending_accept: store::Queue<stream::NextAccept>,
44
45    /// Locally reset streams that should be reaped when they expire
46    pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48    /// How long locally reset streams should ignore received frames
49    reset_duration: Duration,
50
51    /// Holds frames that are waiting to be read
52    buffer: Buffer<Event>,
53
54    /// Refused StreamId, this represents a frame that must be sent out.
55    refused: Option<StreamId>,
56
57    /// If push promises are allowed to be received.
58    is_push_enabled: bool,
59
60    /// If extended connect protocol is enabled.
61    is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66    Headers(peer::PollMessage),
67    Data(Bytes),
68    Trailers(HeaderMap),
69}
70
71#[derive(Debug)]
72pub(super) enum RecvHeaderBlockError<T> {
73    Oversize(T),
74    State(Error),
75}
76
77#[derive(Debug)]
78pub(crate) enum Open {
79    PushPromise,
80    Headers,
81}
82
83impl Recv {
84    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
85        let next_stream_id = if peer.is_server() { 1 } else { 2 };
86
87        let mut flow = FlowControl::new();
88
89        // connections always have the default window size, regardless of
90        // settings
91        flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
92            .expect("invalid initial remote window size");
93        flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
94
95        Recv {
96            init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
97            flow,
98            in_flight_data: 0 as WindowSize,
99            next_stream_id: Ok(next_stream_id.into()),
100            pending_window_updates: store::Queue::new(),
101            last_processed_id: StreamId::ZERO,
102            max_stream_id: StreamId::MAX,
103            pending_accept: store::Queue::new(),
104            pending_reset_expired: store::Queue::new(),
105            reset_duration: config.local_reset_duration,
106            buffer: Buffer::new(),
107            refused: None,
108            is_push_enabled: config.local_push_enabled,
109            is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
110        }
111    }
112
113    /// Returns the initial receive window size
114    pub fn init_window_sz(&self) -> WindowSize {
115        self.init_window_sz
116    }
117
118    /// Returns the ID of the last processed stream
119    pub fn last_processed_id(&self) -> StreamId {
120        self.last_processed_id
121    }
122
123    /// Update state reflecting a new, remotely opened stream
124    ///
125    /// Returns the stream state if successful. `None` if refused
126    pub fn open(
127        &mut self,
128        id: StreamId,
129        mode: Open,
130        counts: &mut Counts,
131    ) -> Result<Option<StreamId>, Error> {
132        assert!(self.refused.is_none());
133
134        counts.peer().ensure_can_open(id, mode)?;
135
136        let next_id = self.next_stream_id()?;
137        if id < next_id {
138            proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
139            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140        }
141
142        self.next_stream_id = id.next_id();
143
144        if !counts.can_inc_num_recv_streams() {
145            self.refused = Some(id);
146            return Ok(None);
147        }
148
149        Ok(Some(id))
150    }
151
152    /// Transition the stream state based on receiving headers
153    ///
154    /// The caller ensures that the frame represents headers and not trailers.
155    pub fn recv_headers(
156        &mut self,
157        frame: frame::Headers,
158        stream: &mut store::Ptr,
159        counts: &mut Counts,
160    ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
161        tracing::trace!("opening stream; init_window={}", self.init_window_sz);
162        let is_initial = stream.state.recv_open(&frame)?;
163
164        if is_initial {
165            // TODO: be smarter about this logic
166            if frame.stream_id() > self.last_processed_id {
167                self.last_processed_id = frame.stream_id();
168            }
169
170            // Increment the number of concurrent streams
171            counts.inc_num_recv_streams(stream);
172        }
173
174        if !stream.content_length.is_head() {
175            use super::stream::ContentLength;
176            use http::header;
177
178            if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
179                let content_length = match frame::parse_u64(content_length.as_bytes()) {
180                    Ok(v) => v,
181                    Err(_) => {
182                        proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
183                        return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
184                    }
185                };
186
187                stream.content_length = ContentLength::Remaining(content_length);
188            }
189        }
190
191        if frame.is_over_size() {
192            // A frame is over size if the decoded header block was bigger than
193            // SETTINGS_MAX_HEADER_LIST_SIZE.
194            //
195            // > A server that receives a larger header block than it is willing
196            // > to handle can send an HTTP 431 (Request Header Fields Too
197            // > Large) status code [RFC6585]. A client can discard responses
198            // > that it cannot process.
199            //
200            // So, if peer is a server, we'll send a 431. In either case,
201            // an error is recorded, which will send a REFUSED_STREAM,
202            // since we don't want any of the data frames either.
203            tracing::debug!(
204                "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
205                 recv_headers: frame is over size; stream={:?}",
206                stream.id
207            );
208            return if counts.peer().is_server() && is_initial {
209                let mut res = frame::Headers::new(
210                    stream.id,
211                    frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
212                    HeaderMap::new(),
213                );
214                res.set_end_stream();
215                Err(RecvHeaderBlockError::Oversize(Some(res)))
216            } else {
217                Err(RecvHeaderBlockError::Oversize(None))
218            };
219        }
220
221        let stream_id = frame.stream_id();
222        let (pseudo, fields) = frame.into_parts();
223
224        if pseudo.protocol.is_some()
225            && counts.peer().is_server()
226            && !self.is_extended_connect_protocol_enabled
227        {
228            proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
229            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
230        }
231
232        if pseudo.status.is_some() && counts.peer().is_server() {
233            proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
234            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
235        }
236
237        if !pseudo.is_informational() {
238            let message = counts
239                .peer()
240                .convert_poll_message(pseudo, fields, stream_id)?;
241
242            // Push the frame onto the stream's recv buffer
243            stream
244                .pending_recv
245                .push_back(&mut self.buffer, Event::Headers(message));
246            stream.notify_recv();
247
248            // Only servers can receive a headers frame that initiates the stream.
249            // This is verified in `Streams` before calling this function.
250            if counts.peer().is_server() {
251                // Correctness: never push a stream to `pending_accept` without having the
252                // corresponding headers frame pushed to `stream.pending_recv`.
253                self.pending_accept.push(stream);
254            }
255        }
256
257        Ok(())
258    }
259
260    /// Called by the server to get the request
261    ///
262    /// # Panics
263    ///
264    /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
265    ///
266    pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
267        use super::peer::PollMessage::*;
268
269        match stream.pending_recv.pop_front(&mut self.buffer) {
270            Some(Event::Headers(Server(request))) => request,
271            _ => unreachable!("server stream queue must start with Headers"),
272        }
273    }
274
275    /// Called by the client to get pushed response
276    pub fn poll_pushed(
277        &mut self,
278        cx: &Context,
279        stream: &mut store::Ptr,
280    ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
281        use super::peer::PollMessage::*;
282
283        let mut ppp = stream.pending_push_promises.take();
284        let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
285            match pushed.pending_recv.pop_front(&mut self.buffer) {
286                Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
287                // When frames are pushed into the queue, it is verified that
288                // the first frame is a HEADERS frame.
289                _ => panic!("Headers not set on pushed stream"),
290            }
291        });
292        stream.pending_push_promises = ppp;
293        if let Some(p) = pushed {
294            Poll::Ready(Some(Ok(p)))
295        } else {
296            let is_open = stream.state.ensure_recv_open()?;
297
298            if is_open {
299                stream.recv_task = Some(cx.waker().clone());
300                Poll::Pending
301            } else {
302                Poll::Ready(None)
303            }
304        }
305    }
306
307    /// Called by the client to get the response
308    pub fn poll_response(
309        &mut self,
310        cx: &Context,
311        stream: &mut store::Ptr,
312    ) -> Poll<Result<Response<()>, proto::Error>> {
313        use super::peer::PollMessage::*;
314
315        // If the buffer is not empty, then the first frame must be a HEADERS
316        // frame or the user violated the contract.
317        match stream.pending_recv.pop_front(&mut self.buffer) {
318            Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
319            Some(_) => panic!("poll_response called after response returned"),
320            None => {
321                if !stream.state.ensure_recv_open()? {
322                    proto_err!(stream: "poll_response: stream={:?} is not opened;",  stream.id);
323                    return Poll::Ready(Err(Error::library_reset(
324                        stream.id,
325                        Reason::PROTOCOL_ERROR,
326                    )));
327                }
328
329                stream.recv_task = Some(cx.waker().clone());
330                Poll::Pending
331            }
332        }
333    }
334
335    /// Transition the stream based on receiving trailers
336    pub fn recv_trailers(
337        &mut self,
338        frame: frame::Headers,
339        stream: &mut store::Ptr,
340    ) -> Result<(), Error> {
341        // Transition the state
342        stream.state.recv_close()?;
343
344        if stream.ensure_content_length_zero().is_err() {
345            proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
346            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
347        }
348
349        let trailers = frame.into_fields();
350
351        // Push the frame onto the stream's recv buffer
352        stream
353            .pending_recv
354            .push_back(&mut self.buffer, Event::Trailers(trailers));
355        stream.notify_recv();
356
357        Ok(())
358    }
359
360    /// Releases capacity of the connection
361    pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
362        tracing::trace!(
363            "release_connection_capacity; size={}, connection in_flight_data={}",
364            capacity,
365            self.in_flight_data,
366        );
367
368        // Decrement in-flight data
369        self.in_flight_data -= capacity;
370
371        // Assign capacity to connection
372        // TODO: proper error handling
373        let _res = self.flow.assign_capacity(capacity);
374        debug_assert!(_res.is_ok());
375
376        if self.flow.unclaimed_capacity().is_some() {
377            if let Some(task) = task.take() {
378                task.wake();
379            }
380        }
381    }
382
383    /// Releases capacity back to the connection & stream
384    pub fn release_capacity(
385        &mut self,
386        capacity: WindowSize,
387        stream: &mut store::Ptr,
388        task: &mut Option<Waker>,
389    ) -> Result<(), UserError> {
390        tracing::trace!("release_capacity; size={}", capacity);
391
392        if capacity > stream.in_flight_recv_data {
393            return Err(UserError::ReleaseCapacityTooBig);
394        }
395
396        self.release_connection_capacity(capacity, task);
397
398        // Decrement in-flight data
399        stream.in_flight_recv_data -= capacity;
400
401        // Assign capacity to stream
402        // TODO: proper error handling
403        let _res = stream.recv_flow.assign_capacity(capacity);
404        debug_assert!(_res.is_ok());
405
406        if stream.recv_flow.unclaimed_capacity().is_some() {
407            // Queue the stream for sending the WINDOW_UPDATE frame.
408            self.pending_window_updates.push(stream);
409
410            if let Some(task) = task.take() {
411                task.wake();
412            }
413        }
414
415        Ok(())
416    }
417
418    /// Release any unclaimed capacity for a closed stream.
419    pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
420        debug_assert_eq!(stream.ref_count, 0);
421
422        if stream.in_flight_recv_data == 0 {
423            return;
424        }
425
426        tracing::trace!(
427            "auto-release closed stream ({:?}) capacity: {:?}",
428            stream.id,
429            stream.in_flight_recv_data,
430        );
431
432        self.release_connection_capacity(stream.in_flight_recv_data, task);
433        stream.in_flight_recv_data = 0;
434
435        self.clear_recv_buffer(stream);
436    }
437
438    /// Set the "target" connection window size.
439    ///
440    /// By default, all new connections start with 64kb of window size. As
441    /// streams used and release capacity, we will send WINDOW_UPDATEs for the
442    /// connection to bring it back up to the initial "target".
443    ///
444    /// Setting a target means that we will try to tell the peer about
445    /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
446    /// for the whole connection.
447    ///
448    /// The `task` is an optional parked task for the `Connection` that might
449    /// be blocked on needing more window capacity.
450    pub fn set_target_connection_window(
451        &mut self,
452        target: WindowSize,
453        task: &mut Option<Waker>,
454    ) -> Result<(), Reason> {
455        tracing::trace!(
456            "set_target_connection_window; target={}; available={}, reserved={}",
457            target,
458            self.flow.available(),
459            self.in_flight_data,
460        );
461
462        // The current target connection window is our `available` plus any
463        // in-flight data reserved by streams.
464        //
465        // Update the flow controller with the difference between the new
466        // target and the current target.
467        let current = self
468            .flow
469            .available()
470            .add(self.in_flight_data)?
471            .checked_size();
472        if target > current {
473            self.flow.assign_capacity(target - current)?;
474        } else {
475            self.flow.claim_capacity(current - target)?;
476        }
477
478        // If changing the target capacity means we gained a bunch of capacity,
479        // enough that we went over the update threshold, then schedule sending
480        // a connection WINDOW_UPDATE.
481        if self.flow.unclaimed_capacity().is_some() {
482            if let Some(task) = task.take() {
483                task.wake();
484            }
485        }
486        Ok(())
487    }
488
489    pub(crate) fn apply_local_settings(
490        &mut self,
491        settings: &frame::Settings,
492        store: &mut Store,
493    ) -> Result<(), proto::Error> {
494        if let Some(val) = settings.is_extended_connect_protocol_enabled() {
495            self.is_extended_connect_protocol_enabled = val;
496        }
497
498        if let Some(target) = settings.initial_window_size() {
499            let old_sz = self.init_window_sz;
500            self.init_window_sz = target;
501
502            tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
503
504            // Per RFC 7540 §6.9.2:
505            //
506            // In addition to changing the flow-control window for streams that are
507            // not yet active, a SETTINGS frame can alter the initial flow-control
508            // window size for streams with active flow-control windows (that is,
509            // streams in the "open" or "half-closed (remote)" state). When the
510            // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
511            // the size of all stream flow-control windows that it maintains by the
512            // difference between the new value and the old value.
513            //
514            // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
515            // space in a flow-control window to become negative. A sender MUST
516            // track the negative flow-control window and MUST NOT send new
517            // flow-controlled frames until it receives WINDOW_UPDATE frames that
518            // cause the flow-control window to become positive.
519
520            match target.cmp(&old_sz) {
521                Ordering::Less => {
522                    // We must decrease the (local) window on every open stream.
523                    let dec = old_sz - target;
524                    tracing::trace!("decrementing all windows; dec={}", dec);
525
526                    store.try_for_each(|mut stream| {
527                        stream
528                            .recv_flow
529                            .dec_recv_window(dec)
530                            .map_err(proto::Error::library_go_away)?;
531                        Ok::<_, proto::Error>(())
532                    })?;
533                }
534                Ordering::Greater => {
535                    // We must increase the (local) window on every open stream.
536                    let inc = target - old_sz;
537                    tracing::trace!("incrementing all windows; inc={}", inc);
538                    store.try_for_each(|mut stream| {
539                        // XXX: Shouldn't the peer have already noticed our
540                        // overflow and sent us a GOAWAY?
541                        stream
542                            .recv_flow
543                            .inc_window(inc)
544                            .map_err(proto::Error::library_go_away)?;
545                        stream
546                            .recv_flow
547                            .assign_capacity(inc)
548                            .map_err(proto::Error::library_go_away)?;
549                        Ok::<_, proto::Error>(())
550                    })?;
551                }
552                Ordering::Equal => (),
553            }
554        }
555
556        Ok(())
557    }
558
559    pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
560        if !stream.state.is_recv_closed() {
561            return false;
562        }
563
564        stream.pending_recv.is_empty()
565    }
566
567    pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
568        let sz = frame.payload().len();
569
570        // This should have been enforced at the codec::FramedRead layer, so
571        // this is just a sanity check.
572        assert!(sz <= MAX_WINDOW_SIZE as usize);
573
574        let sz = sz as WindowSize;
575
576        let is_ignoring_frame = stream.state.is_local_error();
577
578        if !is_ignoring_frame && !stream.state.is_recv_streaming() {
579            // TODO: There are cases where this can be a stream error of
580            // STREAM_CLOSED instead...
581
582            // Receiving a DATA frame when not expecting one is a protocol
583            // error.
584            proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
585            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
586        }
587
588        tracing::trace!(
589            "recv_data; size={}; connection={}; stream={}",
590            sz,
591            self.flow.window_size(),
592            stream.recv_flow.window_size()
593        );
594
595        if is_ignoring_frame {
596            tracing::trace!(
597                "recv_data; frame ignored on locally reset {:?} for some time",
598                stream.id,
599            );
600            return self.ignore_data(sz);
601        }
602
603        // Ensure that there is enough capacity on the connection before acting
604        // on the stream.
605        self.consume_connection_window(sz)?;
606
607        if stream.recv_flow.window_size() < sz {
608            // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
609            // > A receiver MAY respond with a stream error (Section 5.4.2) or
610            // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
611            // > it is unable to accept a frame.
612            //
613            // So, for violating the **stream** window, we can send either a
614            // stream or connection error. We've opted to send a stream
615            // error.
616            return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
617        }
618
619        if stream.dec_content_length(frame.payload().len()).is_err() {
620            proto_err!(stream:
621                "recv_data: content-length overflow; stream={:?}; len={:?}",
622                stream.id,
623                frame.payload().len(),
624            );
625            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
626        }
627
628        if frame.is_end_stream() {
629            if stream.ensure_content_length_zero().is_err() {
630                proto_err!(stream:
631                    "recv_data: content-length underflow; stream={:?}; len={:?}",
632                    stream.id,
633                    frame.payload().len(),
634                );
635                return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
636            }
637
638            if stream.state.recv_close().is_err() {
639                proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
640                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
641            }
642        }
643
644        // Received a frame, but no one cared about it. fix issue#648
645        if !stream.is_recv {
646            tracing::trace!(
647                "recv_data; frame ignored on stream release {:?} for some time",
648                stream.id,
649            );
650            self.release_connection_capacity(sz, &mut None);
651            return Ok(());
652        }
653
654        // Update stream level flow control
655        stream
656            .recv_flow
657            .send_data(sz)
658            .map_err(proto::Error::library_go_away)?;
659
660        // Track the data as in-flight
661        stream.in_flight_recv_data += sz;
662
663        let event = Event::Data(frame.into_payload());
664
665        // Push the frame onto the recv buffer
666        stream.pending_recv.push_back(&mut self.buffer, event);
667        stream.notify_recv();
668
669        Ok(())
670    }
671
672    pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
673        // Ensure that there is enough capacity on the connection...
674        self.consume_connection_window(sz)?;
675
676        // Since we are ignoring this frame,
677        // we aren't returning the frame to the user. That means they
678        // have no way to release the capacity back to the connection. So
679        // we have to release it automatically.
680        //
681        // This call doesn't send a WINDOW_UPDATE immediately, just marks
682        // the capacity as available to be reclaimed. When the available
683        // capacity meets a threshold, a WINDOW_UPDATE is then sent.
684        self.release_connection_capacity(sz, &mut None);
685        Ok(())
686    }
687
688    pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
689        if self.flow.window_size() < sz {
690            tracing::debug!(
691                "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
692                self.flow.window_size(),
693                sz,
694            );
695            return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
696        }
697
698        // Update connection level flow control
699        self.flow.send_data(sz).map_err(Error::library_go_away)?;
700
701        // Track the data as in-flight
702        self.in_flight_data += sz;
703        Ok(())
704    }
705
706    pub fn recv_push_promise(
707        &mut self,
708        frame: frame::PushPromise,
709        stream: &mut store::Ptr,
710    ) -> Result<(), Error> {
711        stream.state.reserve_remote()?;
712        if frame.is_over_size() {
713            // A frame is over size if the decoded header block was bigger than
714            // SETTINGS_MAX_HEADER_LIST_SIZE.
715            //
716            // > A server that receives a larger header block than it is willing
717            // > to handle can send an HTTP 431 (Request Header Fields Too
718            // > Large) status code [RFC6585]. A client can discard responses
719            // > that it cannot process.
720            //
721            // So, if peer is a server, we'll send a 431. In either case,
722            // an error is recorded, which will send a REFUSED_STREAM,
723            // since we don't want any of the data frames either.
724            tracing::debug!(
725                "stream error REFUSED_STREAM -- recv_push_promise: \
726                 headers frame is over size; promised_id={:?};",
727                frame.promised_id(),
728            );
729            return Err(Error::library_reset(
730                frame.promised_id(),
731                Reason::REFUSED_STREAM,
732            ));
733        }
734
735        let promised_id = frame.promised_id();
736        let (pseudo, fields) = frame.into_parts();
737        let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
738
739        if let Err(e) = frame::PushPromise::validate_request(&req) {
740            use PushPromiseHeaderError::*;
741            match e {
742                NotSafeAndCacheable => proto_err!(
743                    stream:
744                    "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
745                    req.method(),
746                    promised_id,
747                ),
748                InvalidContentLength(e) => proto_err!(
749                    stream:
750                    "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
751                    e,
752                    promised_id,
753                ),
754            }
755            return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
756        }
757
758        use super::peer::PollMessage::*;
759        stream
760            .pending_recv
761            .push_back(&mut self.buffer, Event::Headers(Server(req)));
762        stream.notify_recv();
763        Ok(())
764    }
765
766    /// Ensures that `id` is not in the `Idle` state.
767    pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
768        if let Ok(next) = self.next_stream_id {
769            if id >= next {
770                tracing::debug!(
771                    "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
772                    id
773                );
774                return Err(Reason::PROTOCOL_ERROR);
775            }
776        }
777        // if next_stream_id is overflowed, that's ok.
778
779        Ok(())
780    }
781
782    /// Handle remote sending an explicit RST_STREAM.
783    pub fn recv_reset(
784        &mut self,
785        frame: frame::Reset,
786        stream: &mut Stream,
787        counts: &mut Counts,
788    ) -> Result<(), Error> {
789        // Reseting a stream that the user hasn't accepted is possible,
790        // but should be done with care. These streams will continue
791        // to take up memory in the accept queue, but will no longer be
792        // counted as "concurrent" streams.
793        //
794        // So, we have a separate limit for these.
795        //
796        // See https://github.com/hyperium/hyper/issues/2877
797        if stream.is_pending_accept {
798            if counts.can_inc_num_remote_reset_streams() {
799                counts.inc_num_remote_reset_streams();
800            } else {
801                tracing::warn!(
802                    "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
803                    counts.max_remote_reset_streams(),
804                );
805                return Err(Error::library_go_away_data(
806                    Reason::ENHANCE_YOUR_CALM,
807                    "too_many_resets",
808                ));
809            }
810        }
811
812        // Notify the stream
813        stream.state.recv_reset(frame, stream.is_pending_send);
814
815        stream.notify_send();
816        stream.notify_recv();
817
818        Ok(())
819    }
820
821    /// Handle a connection-level error
822    pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
823        // Receive an error
824        stream.state.handle_error(err);
825
826        // If a receiver is waiting, notify it
827        stream.notify_send();
828        stream.notify_recv();
829    }
830
831    pub fn go_away(&mut self, last_processed_id: StreamId) {
832        assert!(self.max_stream_id >= last_processed_id);
833        self.max_stream_id = last_processed_id;
834    }
835
836    pub fn recv_eof(&mut self, stream: &mut Stream) {
837        stream.state.recv_eof();
838        stream.notify_send();
839        stream.notify_recv();
840    }
841
842    pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
843        while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
844            // drop it
845        }
846    }
847
848    /// Get the max ID of streams we can receive.
849    ///
850    /// This gets lowered if we send a GOAWAY frame.
851    pub fn max_stream_id(&self) -> StreamId {
852        self.max_stream_id
853    }
854
855    pub fn next_stream_id(&self) -> Result<StreamId, Error> {
856        if let Ok(id) = self.next_stream_id {
857            Ok(id)
858        } else {
859            Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
860        }
861    }
862
863    pub fn may_have_created_stream(&self, id: StreamId) -> bool {
864        if let Ok(next_id) = self.next_stream_id {
865            // Peer::is_local_init should have been called beforehand
866            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
867            id < next_id
868        } else {
869            true
870        }
871    }
872
873    pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
874        if let Ok(next_id) = self.next_stream_id {
875            // !Peer::is_local_init should have been called beforehand
876            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
877            if id >= next_id {
878                self.next_stream_id = id.next_id();
879            }
880        }
881    }
882
883    /// Returns true if the remote peer can reserve a stream with the given ID.
884    pub fn ensure_can_reserve(&self) -> Result<(), Error> {
885        if !self.is_push_enabled {
886            proto_err!(conn: "recv_push_promise: push is disabled");
887            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
888        }
889
890        Ok(())
891    }
892
893    /// Add a locally reset stream to queue to be eventually reaped.
894    pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
895        if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
896            return;
897        }
898
899        tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
900
901        if counts.can_inc_num_reset_streams() {
902            counts.inc_num_reset_streams();
903            self.pending_reset_expired.push(stream);
904        }
905    }
906
907    /// Send any pending refusals.
908    pub fn send_pending_refusal<T, B>(
909        &mut self,
910        cx: &mut Context,
911        dst: &mut Codec<T, Prioritized<B>>,
912    ) -> Poll<io::Result<()>>
913    where
914        T: AsyncWrite + Unpin,
915        B: Buf,
916    {
917        if let Some(stream_id) = self.refused {
918            ready!(dst.poll_ready(cx))?;
919
920            // Create the RST_STREAM frame
921            let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
922
923            // Buffer the frame
924            dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
925        }
926
927        self.refused = None;
928
929        Poll::Ready(Ok(()))
930    }
931
932    pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
933        if !self.pending_reset_expired.is_empty() {
934            let now = Instant::now();
935            let reset_duration = self.reset_duration;
936            while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
937                let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
938                // rust-lang/rust#86470 tracks a bug in the standard library where `Instant`
939                // subtraction can panic (because, on some platforms, `Instant` isn't actually
940                // monotonic). We use a saturating operation to avoid this panic here.
941                now.saturating_duration_since(reset_at) > reset_duration
942            }) {
943                counts.transition_after(stream, true);
944            }
945        }
946    }
947
948    pub fn clear_queues(
949        &mut self,
950        clear_pending_accept: bool,
951        store: &mut Store,
952        counts: &mut Counts,
953    ) {
954        self.clear_stream_window_update_queue(store, counts);
955        self.clear_all_reset_streams(store, counts);
956
957        if clear_pending_accept {
958            self.clear_all_pending_accept(store, counts);
959        }
960    }
961
962    fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
963        while let Some(stream) = self.pending_window_updates.pop(store) {
964            counts.transition(stream, |_, stream| {
965                tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
966            })
967        }
968    }
969
970    /// Called on EOF
971    fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
972        while let Some(stream) = self.pending_reset_expired.pop(store) {
973            counts.transition_after(stream, true);
974        }
975    }
976
977    fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
978        while let Some(stream) = self.pending_accept.pop(store) {
979            counts.transition_after(stream, false);
980        }
981    }
982
983    pub fn poll_complete<T, B>(
984        &mut self,
985        cx: &mut Context,
986        store: &mut Store,
987        counts: &mut Counts,
988        dst: &mut Codec<T, Prioritized<B>>,
989    ) -> Poll<io::Result<()>>
990    where
991        T: AsyncWrite + Unpin,
992        B: Buf,
993    {
994        // Send any pending connection level window updates
995        ready!(self.send_connection_window_update(cx, dst))?;
996
997        // Send any pending stream level window updates
998        ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
999
1000        Poll::Ready(Ok(()))
1001    }
1002
1003    /// Send connection level window update
1004    fn send_connection_window_update<T, B>(
1005        &mut self,
1006        cx: &mut Context,
1007        dst: &mut Codec<T, Prioritized<B>>,
1008    ) -> Poll<io::Result<()>>
1009    where
1010        T: AsyncWrite + Unpin,
1011        B: Buf,
1012    {
1013        if let Some(incr) = self.flow.unclaimed_capacity() {
1014            let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1015
1016            // Ensure the codec has capacity
1017            ready!(dst.poll_ready(cx))?;
1018
1019            // Buffer the WINDOW_UPDATE frame
1020            dst.buffer(frame.into())
1021                .expect("invalid WINDOW_UPDATE frame");
1022
1023            // Update flow control
1024            self.flow
1025                .inc_window(incr)
1026                .expect("unexpected flow control state");
1027        }
1028
1029        Poll::Ready(Ok(()))
1030    }
1031
1032    /// Send stream level window update
1033    pub fn send_stream_window_updates<T, B>(
1034        &mut self,
1035        cx: &mut Context,
1036        store: &mut Store,
1037        counts: &mut Counts,
1038        dst: &mut Codec<T, Prioritized<B>>,
1039    ) -> Poll<io::Result<()>>
1040    where
1041        T: AsyncWrite + Unpin,
1042        B: Buf,
1043    {
1044        loop {
1045            // Ensure the codec has capacity
1046            ready!(dst.poll_ready(cx))?;
1047
1048            // Get the next stream
1049            let stream = match self.pending_window_updates.pop(store) {
1050                Some(stream) => stream,
1051                None => return Poll::Ready(Ok(())),
1052            };
1053
1054            counts.transition(stream, |_, stream| {
1055                tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1056                debug_assert!(!stream.is_pending_window_update);
1057
1058                if !stream.state.is_recv_streaming() {
1059                    // No need to send window updates on the stream if the stream is
1060                    // no longer receiving data.
1061                    //
1062                    // TODO: is this correct? We could possibly send a window
1063                    // update on a ReservedRemote stream if we already know
1064                    // we want to stream the data faster...
1065                    return;
1066                }
1067
1068                // TODO: de-dup
1069                if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1070                    // Create the WINDOW_UPDATE frame
1071                    let frame = frame::WindowUpdate::new(stream.id, incr);
1072
1073                    // Buffer it
1074                    dst.buffer(frame.into())
1075                        .expect("invalid WINDOW_UPDATE frame");
1076
1077                    // Update flow control
1078                    stream
1079                        .recv_flow
1080                        .inc_window(incr)
1081                        .expect("unexpected flow control state");
1082                }
1083            })
1084        }
1085    }
1086
1087    pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1088        self.pending_accept.pop(store).map(|ptr| ptr.key())
1089    }
1090
1091    pub fn poll_data(
1092        &mut self,
1093        cx: &Context,
1094        stream: &mut Stream,
1095    ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1096        match stream.pending_recv.pop_front(&mut self.buffer) {
1097            Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1098            Some(event) => {
1099                // Frame is trailer
1100                stream.pending_recv.push_front(&mut self.buffer, event);
1101
1102                // Notify the recv task. This is done just in case
1103                // `poll_trailers` was called.
1104                //
1105                // It is very likely that `notify_recv` will just be a no-op (as
1106                // the task will be None), so this isn't really much of a
1107                // performance concern. It also means we don't have to track
1108                // state to see if `poll_trailers` was called before `poll_data`
1109                // returned `None`.
1110                stream.notify_recv();
1111
1112                // No more data frames
1113                Poll::Ready(None)
1114            }
1115            None => self.schedule_recv(cx, stream),
1116        }
1117    }
1118
1119    pub fn poll_trailers(
1120        &mut self,
1121        cx: &Context,
1122        stream: &mut Stream,
1123    ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1124        match stream.pending_recv.pop_front(&mut self.buffer) {
1125            Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1126            Some(event) => {
1127                // Frame is not trailers.. not ready to poll trailers yet.
1128                stream.pending_recv.push_front(&mut self.buffer, event);
1129
1130                Poll::Pending
1131            }
1132            None => self.schedule_recv(cx, stream),
1133        }
1134    }
1135
1136    fn schedule_recv<T>(
1137        &mut self,
1138        cx: &Context,
1139        stream: &mut Stream,
1140    ) -> Poll<Option<Result<T, proto::Error>>> {
1141        if stream.state.ensure_recv_open()? {
1142            // Request to get notified once more frames arrive
1143            stream.recv_task = Some(cx.waker().clone());
1144            Poll::Pending
1145        } else {
1146            // No more frames will be received
1147            Poll::Ready(None)
1148        }
1149    }
1150}
1151
1152// ===== impl Open =====
1153
1154impl Open {
1155    pub fn is_push_promise(&self) -> bool {
1156        matches!(*self, Self::PushPromise)
1157    }
1158}
1159
1160// ===== impl RecvHeaderBlockError =====
1161
1162impl<T> From<Error> for RecvHeaderBlockError<T> {
1163    fn from(err: Error) -> Self {
1164        RecvHeaderBlockError::State(err)
1165    }
1166}