1use std::fmt;
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::{Arc, Mutex};
25use std::task::{self, Poll};
26use std::time::{Duration, Instant};
27
28use h2::{Ping, PingPong};
29
30use crate::common::time::Time;
31use crate::rt::Sleep;
32
33type WindowSize = u32;
34
35pub(super) fn disabled() -> Recorder {
36 Recorder { shared: None }
37}
38
39pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) {
40 debug_assert!(
41 config.is_enabled(),
42 "ping channel requires bdp or keep-alive config",
43 );
44
45 let bdp = config.bdp_initial_window.map(|wnd| Bdp {
46 bdp: wnd,
47 max_bandwidth: 0.0,
48 rtt: 0.0,
49 ping_delay: Duration::from_millis(100),
50 stable_count: 0,
51 });
52
53 let (bytes, next_bdp_at) = if bdp.is_some() {
54 (Some(0), Some(Instant::now()))
55 } else {
56 (None, None)
57 };
58
59 let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
60 interval,
61 timeout: config.keep_alive_timeout,
62 while_idle: config.keep_alive_while_idle,
63 sleep: __timer.sleep(interval),
64 state: KeepAliveState::Init,
65 timer: __timer,
66 });
67
68 let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
69
70 let shared = Arc::new(Mutex::new(Shared {
71 bytes,
72 last_read_at,
73 is_keep_alive_timed_out: false,
74 ping_pong,
75 ping_sent_at: None,
76 next_bdp_at,
77 }));
78
79 (
80 Recorder {
81 shared: Some(shared.clone()),
82 },
83 Ponger {
84 bdp,
85 keep_alive,
86 shared,
87 },
88 )
89}
90
91#[derive(Clone)]
92pub(super) struct Config {
93 pub(super) bdp_initial_window: Option<WindowSize>,
94 pub(super) keep_alive_interval: Option<Duration>,
96 pub(super) keep_alive_timeout: Duration,
99 pub(super) keep_alive_while_idle: bool,
101}
102
103#[derive(Clone)]
104pub(crate) struct Recorder {
105 shared: Option<Arc<Mutex<Shared>>>,
106}
107
108pub(super) struct Ponger {
109 bdp: Option<Bdp>,
110 keep_alive: Option<KeepAlive>,
111 shared: Arc<Mutex<Shared>>,
112}
113
114struct Shared {
115 ping_pong: PingPong,
116 ping_sent_at: Option<Instant>,
117
118 bytes: Option<usize>,
122 next_bdp_at: Option<Instant>,
125
126 last_read_at: Option<Instant>,
130
131 is_keep_alive_timed_out: bool,
132}
133
134struct Bdp {
135 bdp: u32,
137 max_bandwidth: f64,
139 rtt: f64,
141 ping_delay: Duration,
145 stable_count: u32,
147}
148
149struct KeepAlive {
150 interval: Duration,
152 timeout: Duration,
155 while_idle: bool,
157 state: KeepAliveState,
158 sleep: Pin<Box<dyn Sleep>>,
159 timer: Time,
160}
161
162enum KeepAliveState {
163 Init,
164 Scheduled(Instant),
165 PingSent,
166}
167
168pub(super) enum Ponged {
169 SizeUpdate(WindowSize),
170 KeepAliveTimedOut,
171}
172
173#[derive(Debug)]
174pub(super) struct KeepAliveTimedOut;
175
176impl Config {
179 pub(super) fn is_enabled(&self) -> bool {
180 self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
181 }
182}
183
184impl Recorder {
187 pub(crate) fn record_data(&self, len: usize) {
188 let shared = if let Some(ref shared) = self.shared {
189 shared
190 } else {
191 return;
192 };
193
194 let mut locked = shared.lock().unwrap();
195
196 locked.update_last_read_at();
197
198 if let Some(ref next_bdp_at) = locked.next_bdp_at {
202 if Instant::now() < *next_bdp_at {
203 return;
204 } else {
205 locked.next_bdp_at = None;
206 }
207 }
208
209 if let Some(ref mut bytes) = locked.bytes {
210 *bytes += len;
211 } else {
212 return;
214 }
215
216 if !locked.is_ping_sent() {
217 locked.send_ping();
218 }
219 }
220
221 pub(crate) fn record_non_data(&self) {
222 let shared = if let Some(ref shared) = self.shared {
223 shared
224 } else {
225 return;
226 };
227
228 let mut locked = shared.lock().unwrap();
229
230 locked.update_last_read_at();
231 }
232
233 #[cfg(feature = "client")]
236 pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
237 if stream.is_end_stream() {
238 disabled()
239 } else {
240 self
241 }
242 }
243
244 pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
245 if let Some(ref shared) = self.shared {
246 let locked = shared.lock().unwrap();
247 if locked.is_keep_alive_timed_out {
248 return Err(KeepAliveTimedOut.crate_error());
249 }
250 }
251
252 Ok(())
254 }
255}
256
257impl Ponger {
260 pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
261 let now = Instant::now();
262 let mut locked = self.shared.lock().unwrap();
263 let is_idle = self.is_idle();
264
265 if let Some(ref mut ka) = self.keep_alive {
266 ka.maybe_schedule(is_idle, &locked);
267 ka.maybe_ping(cx, is_idle, &mut locked);
268 }
269
270 if !locked.is_ping_sent() {
271 return Poll::Pending;
273 }
274
275 match locked.ping_pong.poll_pong(cx) {
276 Poll::Ready(Ok(_pong)) => {
277 let start = locked
278 .ping_sent_at
279 .expect("pong received implies ping_sent_at");
280 locked.ping_sent_at = None;
281 let rtt = now - start;
282 trace!("recv pong");
283
284 if let Some(ref mut ka) = self.keep_alive {
285 locked.update_last_read_at();
286 ka.maybe_schedule(is_idle, &locked);
287 ka.maybe_ping(cx, is_idle, &mut locked);
288 }
289
290 if let Some(ref mut bdp) = self.bdp {
291 let bytes = locked.bytes.expect("bdp enabled implies bytes");
292 locked.bytes = Some(0); trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
294
295 let update = bdp.calculate(bytes, rtt);
296 locked.next_bdp_at = Some(now + bdp.ping_delay);
297 if let Some(update) = update {
298 return Poll::Ready(Ponged::SizeUpdate(update));
299 }
300 }
301 }
302 Poll::Ready(Err(_e)) => {
303 debug!("pong error: {}", _e);
304 }
305 Poll::Pending => {
306 if let Some(ref mut ka) = self.keep_alive {
307 if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
308 self.keep_alive = None;
309 locked.is_keep_alive_timed_out = true;
310 return Poll::Ready(Ponged::KeepAliveTimedOut);
311 }
312 }
313 }
314 }
315
316 Poll::Pending
318 }
319
320 fn is_idle(&self) -> bool {
321 Arc::strong_count(&self.shared) <= 2
322 }
323}
324
325impl Shared {
328 fn send_ping(&mut self) {
329 match self.ping_pong.send_ping(Ping::opaque()) {
330 Ok(()) => {
331 self.ping_sent_at = Some(Instant::now());
332 trace!("sent ping");
333 }
334 Err(_err) => {
335 debug!("error sending ping: {}", _err);
336 }
337 }
338 }
339
340 fn is_ping_sent(&self) -> bool {
341 self.ping_sent_at.is_some()
342 }
343
344 fn update_last_read_at(&mut self) {
345 if self.last_read_at.is_some() {
346 self.last_read_at = Some(Instant::now());
347 }
348 }
349
350 fn last_read_at(&self) -> Instant {
351 self.last_read_at.expect("keep_alive expects last_read_at")
352 }
353}
354
355const BDP_LIMIT: usize = 1024 * 1024 * 16;
359
360impl Bdp {
361 fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
362 if self.bdp as usize == BDP_LIMIT {
364 self.stabilize_delay();
365 return None;
366 }
367
368 let rtt = seconds(rtt);
370 if self.rtt == 0.0 {
371 self.rtt = rtt;
373 } else {
374 self.rtt += (rtt - self.rtt) * 0.125;
376 }
377
378 let bw = (bytes as f64) / (self.rtt * 1.5);
380 trace!("current bandwidth = {:.1}B/s", bw);
381
382 if bw < self.max_bandwidth {
383 self.stabilize_delay();
385 return None;
386 } else {
387 self.max_bandwidth = bw;
388 }
389
390 if bytes >= self.bdp as usize * 2 / 3 {
393 self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
394 trace!("BDP increased to {}", self.bdp);
395
396 self.stable_count = 0;
397 self.ping_delay /= 2;
398 Some(self.bdp)
399 } else {
400 self.stabilize_delay();
401 None
402 }
403 }
404
405 fn stabilize_delay(&mut self) {
406 if self.ping_delay < Duration::from_secs(10) {
407 self.stable_count += 1;
408
409 if self.stable_count >= 2 {
410 self.ping_delay *= 4;
411 self.stable_count = 0;
412 }
413 }
414 }
415}
416
417fn seconds(dur: Duration) -> f64 {
418 const NANOS_PER_SEC: f64 = 1_000_000_000.0;
419 let secs = dur.as_secs() as f64;
420 secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
421}
422
423impl KeepAlive {
426 fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
427 match self.state {
428 KeepAliveState::Init => {
429 if !self.while_idle && is_idle {
430 return;
431 }
432
433 self.schedule(shared);
434 }
435 KeepAliveState::PingSent => {
436 if shared.is_ping_sent() {
437 return;
438 }
439 self.schedule(shared);
440 }
441 KeepAliveState::Scheduled(..) => (),
442 }
443 }
444
445 fn schedule(&mut self, shared: &Shared) {
446 let interval = shared.last_read_at() + self.interval;
447 self.state = KeepAliveState::Scheduled(interval);
448 self.timer.reset(&mut self.sleep, interval);
449 }
450
451 fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
452 match self.state {
453 KeepAliveState::Scheduled(at) => {
454 if Pin::new(&mut self.sleep).poll(cx).is_pending() {
455 return;
456 }
457 if shared.last_read_at() + self.interval > at {
459 self.state = KeepAliveState::Init;
460 cx.waker().wake_by_ref(); return;
462 }
463 if !self.while_idle && is_idle {
464 trace!("keep-alive no need to ping when idle and while_idle=false");
465 return;
466 }
467 trace!("keep-alive interval ({:?}) reached", self.interval);
468 shared.send_ping();
469 self.state = KeepAliveState::PingSent;
470 let timeout = Instant::now() + self.timeout;
471 self.timer.reset(&mut self.sleep, timeout);
472 }
473 KeepAliveState::Init | KeepAliveState::PingSent => (),
474 }
475 }
476
477 fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
478 match self.state {
479 KeepAliveState::PingSent => {
480 if Pin::new(&mut self.sleep).poll(cx).is_pending() {
481 return Ok(());
482 }
483 trace!("keep-alive timeout ({:?}) reached", self.timeout);
484 Err(KeepAliveTimedOut)
485 }
486 KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
487 }
488 }
489}
490
491impl KeepAliveTimedOut {
494 pub(super) fn crate_error(self) -> crate::Error {
495 crate::Error::new(crate::error::Kind::Http2).with(self)
496 }
497}
498
499impl fmt::Display for KeepAliveTimedOut {
500 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
501 f.write_str("keep-alive timed out")
502 }
503}
504
505impl std::error::Error for KeepAliveTimedOut {
506 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
507 Some(&crate::error::TimedOut)
508 }
509}