hyper/proto/h2/
ping.rs

1/// HTTP2 Ping usage
2///
3/// hyper uses HTTP2 pings for two purposes:
4///
5/// 1. Adaptive flow control using BDP
6/// 2. Connection keep-alive
7///
8/// Both cases are optional.
9///
10/// # BDP Algorithm
11///
12/// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
13///    1a. Record current time.
14///    1b. Send a BDP ping.
15/// 2. Increment the number of received bytes.
16/// 3. When the BDP ping ack is received:
17///    3a. Record duration from sent time.
18///    3b. Merge RTT with a running average.
19///    3c. Calculate bdp as bytes/rtt.
20///    3d. If bdp is over 2/3 max, set new max to bdp and update windows.
21use std::fmt;
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::{Arc, Mutex};
25use std::task::{self, Poll};
26use std::time::{Duration, Instant};
27
28use h2::{Ping, PingPong};
29
30use crate::common::time::Time;
31use crate::rt::Sleep;
32
33type WindowSize = u32;
34
35pub(super) fn disabled() -> Recorder {
36    Recorder { shared: None }
37}
38
39pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) {
40    debug_assert!(
41        config.is_enabled(),
42        "ping channel requires bdp or keep-alive config",
43    );
44
45    let bdp = config.bdp_initial_window.map(|wnd| Bdp {
46        bdp: wnd,
47        max_bandwidth: 0.0,
48        rtt: 0.0,
49        ping_delay: Duration::from_millis(100),
50        stable_count: 0,
51    });
52
53    let (bytes, next_bdp_at) = if bdp.is_some() {
54        (Some(0), Some(Instant::now()))
55    } else {
56        (None, None)
57    };
58
59    let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
60        interval,
61        timeout: config.keep_alive_timeout,
62        while_idle: config.keep_alive_while_idle,
63        sleep: __timer.sleep(interval),
64        state: KeepAliveState::Init,
65        timer: __timer,
66    });
67
68    let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
69
70    let shared = Arc::new(Mutex::new(Shared {
71        bytes,
72        last_read_at,
73        is_keep_alive_timed_out: false,
74        ping_pong,
75        ping_sent_at: None,
76        next_bdp_at,
77    }));
78
79    (
80        Recorder {
81            shared: Some(shared.clone()),
82        },
83        Ponger {
84            bdp,
85            keep_alive,
86            shared,
87        },
88    )
89}
90
91#[derive(Clone)]
92pub(super) struct Config {
93    pub(super) bdp_initial_window: Option<WindowSize>,
94    /// If no frames are received in this amount of time, a PING frame is sent.
95    pub(super) keep_alive_interval: Option<Duration>,
96    /// After sending a keepalive PING, the connection will be closed if
97    /// a pong is not received in this amount of time.
98    pub(super) keep_alive_timeout: Duration,
99    /// If true, sends pings even when there are no active streams.
100    pub(super) keep_alive_while_idle: bool,
101}
102
103#[derive(Clone)]
104pub(crate) struct Recorder {
105    shared: Option<Arc<Mutex<Shared>>>,
106}
107
108pub(super) struct Ponger {
109    bdp: Option<Bdp>,
110    keep_alive: Option<KeepAlive>,
111    shared: Arc<Mutex<Shared>>,
112}
113
114struct Shared {
115    ping_pong: PingPong,
116    ping_sent_at: Option<Instant>,
117
118    // bdp
119    /// If `Some`, bdp is enabled, and this tracks how many bytes have been
120    /// read during the current sample.
121    bytes: Option<usize>,
122    /// We delay a variable amount of time between BDP pings. This allows us
123    /// to send less pings as the bandwidth stabilizes.
124    next_bdp_at: Option<Instant>,
125
126    // keep-alive
127    /// If `Some`, keep-alive is enabled, and the Instant is how long ago
128    /// the connection read the last frame.
129    last_read_at: Option<Instant>,
130
131    is_keep_alive_timed_out: bool,
132}
133
134struct Bdp {
135    /// Current BDP in bytes
136    bdp: u32,
137    /// Largest bandwidth we've seen so far.
138    max_bandwidth: f64,
139    /// Round trip time in seconds
140    rtt: f64,
141    /// Delay the next ping by this amount.
142    ///
143    /// This will change depending on how stable the current bandwidth is.
144    ping_delay: Duration,
145    /// The count of ping round trips where BDP has stayed the same.
146    stable_count: u32,
147}
148
149struct KeepAlive {
150    /// If no frames are received in this amount of time, a PING frame is sent.
151    interval: Duration,
152    /// After sending a keepalive PING, the connection will be closed if
153    /// a pong is not received in this amount of time.
154    timeout: Duration,
155    /// If true, sends pings even when there are no active streams.
156    while_idle: bool,
157    state: KeepAliveState,
158    sleep: Pin<Box<dyn Sleep>>,
159    timer: Time,
160}
161
162enum KeepAliveState {
163    Init,
164    Scheduled(Instant),
165    PingSent,
166}
167
168pub(super) enum Ponged {
169    SizeUpdate(WindowSize),
170    KeepAliveTimedOut,
171}
172
173#[derive(Debug)]
174pub(super) struct KeepAliveTimedOut;
175
176// ===== impl Config =====
177
178impl Config {
179    pub(super) fn is_enabled(&self) -> bool {
180        self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
181    }
182}
183
184// ===== impl Recorder =====
185
186impl Recorder {
187    pub(crate) fn record_data(&self, len: usize) {
188        let shared = if let Some(ref shared) = self.shared {
189            shared
190        } else {
191            return;
192        };
193
194        let mut locked = shared.lock().unwrap();
195
196        locked.update_last_read_at();
197
198        // are we ready to send another bdp ping?
199        // if not, we don't need to record bytes either
200
201        if let Some(ref next_bdp_at) = locked.next_bdp_at {
202            if Instant::now() < *next_bdp_at {
203                return;
204            } else {
205                locked.next_bdp_at = None;
206            }
207        }
208
209        if let Some(ref mut bytes) = locked.bytes {
210            *bytes += len;
211        } else {
212            // no need to send bdp ping if bdp is disabled
213            return;
214        }
215
216        if !locked.is_ping_sent() {
217            locked.send_ping();
218        }
219    }
220
221    pub(crate) fn record_non_data(&self) {
222        let shared = if let Some(ref shared) = self.shared {
223            shared
224        } else {
225            return;
226        };
227
228        let mut locked = shared.lock().unwrap();
229
230        locked.update_last_read_at();
231    }
232
233    /// If the incoming stream is already closed, convert self into
234    /// a disabled reporter.
235    #[cfg(feature = "client")]
236    pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
237        if stream.is_end_stream() {
238            disabled()
239        } else {
240            self
241        }
242    }
243
244    pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
245        if let Some(ref shared) = self.shared {
246            let locked = shared.lock().unwrap();
247            if locked.is_keep_alive_timed_out {
248                return Err(KeepAliveTimedOut.crate_error());
249            }
250        }
251
252        // else
253        Ok(())
254    }
255}
256
257// ===== impl Ponger =====
258
259impl Ponger {
260    pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
261        let now = Instant::now();
262        let mut locked = self.shared.lock().unwrap();
263        let is_idle = self.is_idle();
264
265        if let Some(ref mut ka) = self.keep_alive {
266            ka.maybe_schedule(is_idle, &locked);
267            ka.maybe_ping(cx, is_idle, &mut locked);
268        }
269
270        if !locked.is_ping_sent() {
271            // XXX: this doesn't register a waker...?
272            return Poll::Pending;
273        }
274
275        match locked.ping_pong.poll_pong(cx) {
276            Poll::Ready(Ok(_pong)) => {
277                let start = locked
278                    .ping_sent_at
279                    .expect("pong received implies ping_sent_at");
280                locked.ping_sent_at = None;
281                let rtt = now - start;
282                trace!("recv pong");
283
284                if let Some(ref mut ka) = self.keep_alive {
285                    locked.update_last_read_at();
286                    ka.maybe_schedule(is_idle, &locked);
287                    ka.maybe_ping(cx, is_idle, &mut locked);
288                }
289
290                if let Some(ref mut bdp) = self.bdp {
291                    let bytes = locked.bytes.expect("bdp enabled implies bytes");
292                    locked.bytes = Some(0); // reset
293                    trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
294
295                    let update = bdp.calculate(bytes, rtt);
296                    locked.next_bdp_at = Some(now + bdp.ping_delay);
297                    if let Some(update) = update {
298                        return Poll::Ready(Ponged::SizeUpdate(update));
299                    }
300                }
301            }
302            Poll::Ready(Err(_e)) => {
303                debug!("pong error: {}", _e);
304            }
305            Poll::Pending => {
306                if let Some(ref mut ka) = self.keep_alive {
307                    if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
308                        self.keep_alive = None;
309                        locked.is_keep_alive_timed_out = true;
310                        return Poll::Ready(Ponged::KeepAliveTimedOut);
311                    }
312                }
313            }
314        }
315
316        // XXX: this doesn't register a waker...?
317        Poll::Pending
318    }
319
320    fn is_idle(&self) -> bool {
321        Arc::strong_count(&self.shared) <= 2
322    }
323}
324
325// ===== impl Shared =====
326
327impl Shared {
328    fn send_ping(&mut self) {
329        match self.ping_pong.send_ping(Ping::opaque()) {
330            Ok(()) => {
331                self.ping_sent_at = Some(Instant::now());
332                trace!("sent ping");
333            }
334            Err(_err) => {
335                debug!("error sending ping: {}", _err);
336            }
337        }
338    }
339
340    fn is_ping_sent(&self) -> bool {
341        self.ping_sent_at.is_some()
342    }
343
344    fn update_last_read_at(&mut self) {
345        if self.last_read_at.is_some() {
346            self.last_read_at = Some(Instant::now());
347        }
348    }
349
350    fn last_read_at(&self) -> Instant {
351        self.last_read_at.expect("keep_alive expects last_read_at")
352    }
353}
354
355// ===== impl Bdp =====
356
357/// Any higher than this likely will be hitting the TCP flow control.
358const BDP_LIMIT: usize = 1024 * 1024 * 16;
359
360impl Bdp {
361    fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
362        // No need to do any math if we're at the limit.
363        if self.bdp as usize == BDP_LIMIT {
364            self.stabilize_delay();
365            return None;
366        }
367
368        // average the rtt
369        let rtt = seconds(rtt);
370        if self.rtt == 0.0 {
371            // First sample means rtt is first rtt.
372            self.rtt = rtt;
373        } else {
374            // Weigh this rtt as 1/8 for a moving average.
375            self.rtt += (rtt - self.rtt) * 0.125;
376        }
377
378        // calculate the current bandwidth
379        let bw = (bytes as f64) / (self.rtt * 1.5);
380        trace!("current bandwidth = {:.1}B/s", bw);
381
382        if bw < self.max_bandwidth {
383            // not a faster bandwidth, so don't update
384            self.stabilize_delay();
385            return None;
386        } else {
387            self.max_bandwidth = bw;
388        }
389
390        // if the current `bytes` sample is at least 2/3 the previous
391        // bdp, increase to double the current sample.
392        if bytes >= self.bdp as usize * 2 / 3 {
393            self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
394            trace!("BDP increased to {}", self.bdp);
395
396            self.stable_count = 0;
397            self.ping_delay /= 2;
398            Some(self.bdp)
399        } else {
400            self.stabilize_delay();
401            None
402        }
403    }
404
405    fn stabilize_delay(&mut self) {
406        if self.ping_delay < Duration::from_secs(10) {
407            self.stable_count += 1;
408
409            if self.stable_count >= 2 {
410                self.ping_delay *= 4;
411                self.stable_count = 0;
412            }
413        }
414    }
415}
416
417fn seconds(dur: Duration) -> f64 {
418    const NANOS_PER_SEC: f64 = 1_000_000_000.0;
419    let secs = dur.as_secs() as f64;
420    secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
421}
422
423// ===== impl KeepAlive =====
424
425impl KeepAlive {
426    fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
427        match self.state {
428            KeepAliveState::Init => {
429                if !self.while_idle && is_idle {
430                    return;
431                }
432
433                self.schedule(shared);
434            }
435            KeepAliveState::PingSent => {
436                if shared.is_ping_sent() {
437                    return;
438                }
439                self.schedule(shared);
440            }
441            KeepAliveState::Scheduled(..) => (),
442        }
443    }
444
445    fn schedule(&mut self, shared: &Shared) {
446        let interval = shared.last_read_at() + self.interval;
447        self.state = KeepAliveState::Scheduled(interval);
448        self.timer.reset(&mut self.sleep, interval);
449    }
450
451    fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
452        match self.state {
453            KeepAliveState::Scheduled(at) => {
454                if Pin::new(&mut self.sleep).poll(cx).is_pending() {
455                    return;
456                }
457                // check if we've received a frame while we were scheduled
458                if shared.last_read_at() + self.interval > at {
459                    self.state = KeepAliveState::Init;
460                    cx.waker().wake_by_ref(); // schedule us again
461                    return;
462                }
463                if !self.while_idle && is_idle {
464                    trace!("keep-alive no need to ping when idle and while_idle=false");
465                    return;
466                }
467                trace!("keep-alive interval ({:?}) reached", self.interval);
468                shared.send_ping();
469                self.state = KeepAliveState::PingSent;
470                let timeout = Instant::now() + self.timeout;
471                self.timer.reset(&mut self.sleep, timeout);
472            }
473            KeepAliveState::Init | KeepAliveState::PingSent => (),
474        }
475    }
476
477    fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
478        match self.state {
479            KeepAliveState::PingSent => {
480                if Pin::new(&mut self.sleep).poll(cx).is_pending() {
481                    return Ok(());
482                }
483                trace!("keep-alive timeout ({:?}) reached", self.timeout);
484                Err(KeepAliveTimedOut)
485            }
486            KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
487        }
488    }
489}
490
491// ===== impl KeepAliveTimedOut =====
492
493impl KeepAliveTimedOut {
494    pub(super) fn crate_error(self) -> crate::Error {
495        crate::Error::new(crate::error::Kind::Http2).with(self)
496    }
497}
498
499impl fmt::Display for KeepAliveTimedOut {
500    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
501        f.write_str("keep-alive timed out")
502    }
503}
504
505impl std::error::Error for KeepAliveTimedOut {
506    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
507        Some(&crate::error::TimedOut)
508    }
509}