1use futures_util::FutureExt;
10use keyed_priority_queue::KeyedPriorityQueue;
11use tokio::sync::mpsc;
12
13use std::{
14 borrow::Borrow,
15 cmp::Reverse,
16 collections::VecDeque,
17 hash::{Hash, Hasher},
18 str::FromStr,
19 sync::{atomic, Arc, Mutex},
20 task::{Context, Poll, Waker},
21 time::{Duration, Instant},
22};
23
24use crate::{
25 conn::{pool::futures::*, Conn},
26 error::*,
27 opts::{Opts, PoolOpts},
28 queryable::transaction::{Transaction, TxOpts},
29};
30
31pub use metrics::Metrics;
32
33mod recycler;
34pub mod futures;
36mod metrics;
37mod ttl_check_inerval;
38
39#[derive(Debug)]
41struct IdlingConn {
42 since: Instant,
44 conn: Conn,
46}
47
48impl IdlingConn {
49 fn expired(&self) -> bool {
51 self.conn
52 .inner
53 .ttl_deadline
54 .map(|t| Instant::now() > t)
55 .unwrap_or_default()
56 }
57
58 fn elapsed(&self) -> Duration {
60 self.since.elapsed()
61 }
62}
63
64impl From<Conn> for IdlingConn {
65 fn from(conn: Conn) -> Self {
66 Self {
67 since: Instant::now(),
68 conn,
69 }
70 }
71}
72
73#[derive(Debug)]
78struct Exchange {
79 waiting: Waitlist,
80 available: VecDeque<IdlingConn>,
81 exist: usize,
82 recycler: Option<(mpsc::UnboundedReceiver<Option<Conn>>, PoolOpts)>,
84}
85
86impl Exchange {
87 fn spawn_futures_if_needed(&mut self, inner: &Arc<Inner>) {
90 use recycler::Recycler;
91 use ttl_check_inerval::TtlCheckInterval;
92 if let Some((dropped, pool_opts)) = self.recycler.take() {
93 tokio::spawn(Recycler::new(pool_opts.clone(), inner.clone(), dropped));
95
96 if pool_opts.inactive_connection_ttl() > Duration::ZERO
99 || pool_opts.abs_conn_ttl().is_some()
100 {
101 tokio::spawn(TtlCheckInterval::new(pool_opts, inner.clone()));
102 }
103 }
104 }
105}
106
107#[derive(Default, Debug)]
108struct Waitlist {
109 queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
110}
111
112impl Waitlist {
113 fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
115 let occupied = self.remove(queue_id);
127 self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
128 !occupied
129 }
130
131 fn pop(&mut self) -> Option<Waker> {
132 match self.queue.pop() {
133 Some((qw, _)) => Some(qw.waker),
134 None => None,
135 }
136 }
137
138 fn remove(&mut self, id: QueueId) -> bool {
140 self.queue.remove(&id).is_some()
141 }
142
143 fn peek_id(&mut self) -> Option<QueueId> {
144 self.queue.peek().map(|(qw, _)| qw.queue_id)
145 }
146}
147
148const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
149
150#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
151pub(crate) struct QueueId(Reverse<u64>);
152
153impl QueueId {
154 fn next() -> Self {
155 static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
156 let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
157 QueueId(Reverse(id))
158 }
159}
160
161#[derive(Debug)]
162struct QueuedWaker {
163 queue_id: QueueId,
164 waker: Waker,
165}
166
167impl Eq for QueuedWaker {}
168
169impl Borrow<QueueId> for QueuedWaker {
170 fn borrow(&self) -> &QueueId {
171 &self.queue_id
172 }
173}
174
175impl PartialEq for QueuedWaker {
176 fn eq(&self, other: &Self) -> bool {
177 self.queue_id == other.queue_id
178 }
179}
180
181impl Hash for QueuedWaker {
182 fn hash<H: Hasher>(&self, state: &mut H) {
183 self.queue_id.hash(state)
184 }
185}
186
187#[derive(Debug)]
189pub struct Inner {
190 metrics: Arc<Metrics>,
191 close: atomic::AtomicBool,
192 closed: atomic::AtomicBool,
193 exchange: Mutex<Exchange>,
194}
195
196#[derive(Debug, Clone)]
205pub struct Pool {
206 opts: Opts,
207 inner: Arc<Inner>,
208 drop: mpsc::UnboundedSender<Option<Conn>>,
209}
210
211impl Pool {
212 pub fn new<O>(opts: O) -> Pool
218 where
219 Opts: TryFrom<O>,
220 <Opts as TryFrom<O>>::Error: std::error::Error,
221 {
222 let opts = Opts::try_from(opts).unwrap();
223 let pool_opts = opts.pool_opts().clone();
224 let (tx, rx) = mpsc::unbounded_channel();
225 Pool {
226 opts,
227 inner: Arc::new(Inner {
228 close: false.into(),
229 closed: false.into(),
230 metrics: Arc::new(Metrics::default()),
231 exchange: Mutex::new(Exchange {
232 available: VecDeque::with_capacity(pool_opts.constraints().max()),
233 waiting: Waitlist::default(),
234 exist: 0,
235 recycler: Some((rx, pool_opts)),
236 }),
237 }),
238 drop: tx,
239 }
240 }
241
242 pub fn metrics(&self) -> Arc<Metrics> {
244 self.inner.metrics.clone()
245 }
246
247 pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
249 let opts = Opts::from_str(url.as_ref())?;
250 Ok(Pool::new(opts))
251 }
252
253 pub fn get_conn(&self) -> GetConn {
255 let reset_connection = self.opts.pool_opts().reset_connection();
256 GetConn::new(self, reset_connection)
257 }
258
259 pub async fn start_transaction(&self, options: TxOpts) -> Result<Transaction<'static>> {
261 let conn = self.get_conn().await?;
262 Transaction::new(conn, options).await
263 }
264
265 pub fn disconnect(self) -> DisconnectPool {
270 DisconnectPool::new(self)
271 }
272
273 fn return_conn(&mut self, conn: Conn) {
275 self.send_to_recycler(conn);
278 }
279
280 fn send_to_recycler(&self, conn: Conn) {
281 if let Err(conn) = self.drop.send(Some(conn)) {
282 let conn = conn.0.unwrap();
283
284 if !self.inner.closed.load(atomic::Ordering::SeqCst) {
287 assert!(conn.inner.pool.is_none());
290 drop(conn);
291 } else {
292 unreachable!("Recycler exited while connections still exist");
293 }
294 }
295 }
296
297 pub(super) fn cancel_connection(&self) {
302 let mut exchange = self.inner.exchange.lock().unwrap();
303 exchange.exist -= 1;
304 self.inner
305 .metrics
306 .create_failed
307 .fetch_add(1, atomic::Ordering::Relaxed);
308 self.inner
309 .metrics
310 .connection_count
311 .store(exchange.exist, atomic::Ordering::Relaxed);
312 if let Some(w) = exchange.waiting.pop() {
314 w.wake();
315 }
316 }
317
318 fn poll_new_conn(
320 &mut self,
321 cx: &mut Context<'_>,
322 queue_id: QueueId,
323 ) -> Poll<Result<GetConnInner>> {
324 let mut exchange = self.inner.exchange.lock().unwrap();
325
326 if self.inner.close.load(atomic::Ordering::Acquire) {
330 return Err(Error::Driver(DriverError::PoolDisconnected)).into();
331 }
332
333 exchange.spawn_futures_if_needed(&self.inner);
334
335 let highest = if let Some(cur) = exchange.waiting.peek_id() {
337 queue_id > cur
338 } else {
339 true
340 };
341
342 if !highest {
344 if exchange.waiting.push(cx.waker().clone(), queue_id) {
345 self.inner
346 .metrics
347 .active_wait_requests
348 .fetch_add(1, atomic::Ordering::Relaxed);
349 }
350 return Poll::Pending;
351 }
352
353 #[allow(unused_variables)] while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() {
355 if !conn.expired() {
356 #[cfg(feature = "hdrhistogram")]
357 self.inner
358 .metrics
359 .connection_idle_duration
360 .lock()
361 .unwrap()
362 .saturating_record(since.elapsed().as_micros() as u64);
363 #[cfg(feature = "hdrhistogram")]
364 let metrics = self.metrics();
365 conn.inner.active_since = Instant::now();
366 return Poll::Ready(Ok(GetConnInner::Checking(
367 async move {
368 conn.stream_mut()?.check().await?;
369 #[cfg(feature = "hdrhistogram")]
370 metrics
371 .check_duration
372 .lock()
373 .unwrap()
374 .saturating_record(
375 conn.inner.active_since.elapsed().as_micros() as u64
376 );
377 Ok(conn)
378 }
379 .boxed(),
380 )));
381 } else {
382 self.send_to_recycler(conn);
383 }
384 }
385
386 self.inner
387 .metrics
388 .connections_in_pool
389 .store(exchange.available.len(), atomic::Ordering::Relaxed);
390
391 if exchange.exist < self.opts.pool_opts().constraints().max() {
394 exchange.exist += 1;
396
397 self.inner
398 .metrics
399 .connection_count
400 .store(exchange.exist, atomic::Ordering::Relaxed);
401
402 let opts = self.opts.clone();
403 #[cfg(feature = "hdrhistogram")]
404 let metrics = self.metrics();
405
406 return Poll::Ready(Ok(GetConnInner::Connecting(
407 async move {
408 let conn = Conn::new(opts).await;
409 #[cfg(feature = "hdrhistogram")]
410 if let Ok(conn) = &conn {
411 metrics
412 .connect_duration
413 .lock()
414 .unwrap()
415 .saturating_record(
416 conn.inner.active_since.elapsed().as_micros() as u64
417 );
418 }
419 conn
420 }
421 .boxed(),
422 )));
423 }
424
425 if exchange.waiting.push(cx.waker().clone(), queue_id) {
427 self.inner
428 .metrics
429 .active_wait_requests
430 .fetch_add(1, atomic::Ordering::Relaxed);
431 }
432 Poll::Pending
433 }
434
435 fn unqueue(&self, queue_id: QueueId) {
436 let mut exchange = self.inner.exchange.lock().unwrap();
437 if exchange.waiting.remove(queue_id) {
438 self.inner
439 .metrics
440 .active_wait_requests
441 .fetch_sub(1, atomic::Ordering::Relaxed);
442 }
443 }
444}
445
446impl Drop for Conn {
447 fn drop(&mut self) {
448 self.inner.infile_handler = None;
449
450 if std::thread::panicking() {
451 if let Some(pool) = self.inner.pool.take() {
453 pool.cancel_connection();
454 }
455
456 return;
457 }
458
459 if let Some(mut pool) = self.inner.pool.take() {
460 pool.return_conn(self.take());
461 } else if self.inner.stream.is_some() && !self.inner.disconnected {
462 crate::conn::disconnect(self.take());
463 }
464 }
465}
466
467#[cfg(test)]
468mod test {
469 use futures_util::{
470 future::{join_all, select, select_all, try_join_all, Either},
471 poll, try_join, FutureExt,
472 };
473 use tokio::time::{sleep, timeout};
474 use waker_fn::waker_fn;
475
476 use std::{
477 cmp::Reverse,
478 future::Future,
479 pin::pin,
480 sync::{Arc, OnceLock},
481 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
482 time::Duration,
483 };
484
485 use crate::{
486 conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID},
487 opts::PoolOpts,
488 prelude::*,
489 test_misc::get_opts,
490 PoolConstraints, Row, TxOpts, Value,
491 };
492
493 macro_rules! conn_ex_field {
494 ($conn:expr, $field:tt) => {
495 ex_field!($conn.inner.pool.as_ref().unwrap(), $field)
496 };
497 }
498
499 macro_rules! ex_field {
500 ($pool:expr, $field:tt) => {
501 $pool.inner.exchange.lock().unwrap().$field
502 };
503 }
504
505 fn pool_with_one_connection() -> Pool {
506 let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
507 let opts = get_opts().pool_opts(pool_opts.clone());
508 Pool::new(opts)
509 }
510
511 #[tokio::test]
512 async fn should_opt_out_of_connection_reset() -> super::Result<()> {
513 let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
514 let opts = get_opts().pool_opts(pool_opts.clone());
515
516 let pool = Pool::new(opts.clone());
517
518 let mut conn = pool.get_conn().await.unwrap();
519 assert_eq!(
520 conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
521 Value::NULL
522 );
523 conn.query_drop("SET @foo = 'foo'").await?;
524 assert_eq!(
525 conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
526 "foo",
527 );
528 drop(conn);
529
530 conn = pool.get_conn().await.unwrap();
531 assert_eq!(
532 conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
533 Value::NULL
534 );
535 conn.query_drop("SET @foo = 'foo'").await?;
536 conn.reset_connection(false);
537 drop(conn);
538
539 conn = pool.get_conn().await.unwrap();
540 assert_eq!(
541 conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
542 "foo",
543 );
544 drop(conn);
545 pool.disconnect().await.unwrap();
546
547 let pool = Pool::new(opts.pool_opts(pool_opts.with_reset_connection(false)));
548 conn = pool.get_conn().await.unwrap();
549 conn.query_drop("SET @foo = 'foo'").await?;
550 drop(conn);
551 conn = pool.get_conn().await.unwrap();
552 assert_eq!(
553 conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
554 "foo",
555 );
556 drop(conn);
557 pool.disconnect().await
558 }
559
560 #[test]
561 fn should_not_hang() -> super::Result<()> {
562 pub struct Database {
563 pool: Pool,
564 }
565
566 impl Database {
567 pub async fn disconnect(self) -> super::Result<()> {
568 self.pool.disconnect().await?;
569 Ok(())
570 }
571 }
572
573 let runtime = tokio::runtime::Runtime::new().unwrap();
574 let database = Database {
575 pool: Pool::new(get_opts()),
576 };
577 runtime.block_on(database.disconnect())
578 }
579
580 #[tokio::test]
581 async fn should_track_conn_if_disconnected_outside_of_a_pool() -> super::Result<()> {
582 let pool = Pool::new(get_opts());
583 let conn = pool.get_conn().await?;
584 conn.disconnect().await?;
585 pool.disconnect().await?;
586 Ok(())
587 }
588
589 #[tokio::test]
590 async fn should_connect() -> super::Result<()> {
591 let pool = Pool::new(crate::Opts::from(get_opts()));
592 pool.get_conn().await?.ping().await?;
593 pool.disconnect().await?;
594 Ok(())
595 }
596
597 #[tokio::test]
598 async fn should_reconnect() -> super::Result<()> {
599 let mut master = crate::Conn::new(get_opts()).await?;
600
601 async fn test(master: &mut crate::Conn, opts: crate::OptsBuilder) -> super::Result<()> {
602 const NUM_CONNS: usize = 5;
603 let pool = Pool::new(opts);
604
605 let connections = (0..NUM_CONNS).map(|_| {
607 async {
608 let mut conn = pool.get_conn().await?;
609 conn.ping().await?;
610 crate::Result::Ok(conn)
611 }
612 .boxed()
613 });
614
615 let ids = try_join_all(connections)
617 .await?
618 .into_iter()
619 .map(|conn| conn.id())
620 .collect::<Vec<_>>();
621
622 sleep(Duration::from_millis(1000)).await;
624
625 pool.get_conn().await?;
627
628 for id in ids {
630 master.query_drop(format!("KILL {}", id)).await?;
631 }
632
633 assert_eq!(ex_field!(pool, available).len(), NUM_CONNS);
635
636 sleep(Duration::from_millis(500)).await;
637
638 let _conn = pool.get_conn().await?;
640
641 assert_eq!(ex_field!(pool, available).len(), 0);
643
644 drop(_conn);
645 pool.disconnect().await
646 }
647
648 println!("Check socket/pipe..");
649 test(&mut master, get_opts()).await?;
650
651 println!("Check tcp..");
652 test(&mut master, get_opts().prefer_socket(false)).await?;
653
654 master.disconnect().await
655 }
656
657 #[tokio::test]
658 async fn should_reuse_connections() -> super::Result<()> {
659 let pool = pool_with_one_connection();
660 let mut conn = pool.get_conn().await?;
661
662 let server_version = conn.server_version();
663 let connection_id = conn.id();
664
665 for _ in 0..16 {
666 drop(conn);
667 conn = pool.get_conn().await?;
668 println!("CONN connection_id={}", conn.id());
669 assert!(conn.id() == connection_id || server_version < (5, 7, 2));
670 }
671
672 Ok(())
673 }
674
675 #[tokio::test]
676 #[ignore]
677 async fn can_handle_the_pressure() {
678 let pool = Pool::new(get_opts());
679 for _ in 0..10i32 {
680 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
681 for i in 0..10_000 {
682 let pool = pool.clone();
683 let tx = tx.clone();
684 tokio::spawn(async move {
685 let _ = pool.get_conn().await.unwrap();
686 tx.send(i).unwrap();
687 });
688 }
689 drop(tx);
690 while (rx.recv().await).is_some() {}
692 }
693 drop(pool);
694 }
695
696 #[tokio::test]
697 async fn should_start_transaction() -> super::Result<()> {
698 let pool = pool_with_one_connection();
699
700 "CREATE TABLE IF NOT EXISTS mysql.tmp(id int)"
701 .ignore(&pool)
702 .await?;
703 "DELETE FROM mysql.tmp".ignore(&pool).await?;
704
705 let mut tx = pool.start_transaction(TxOpts::default()).await?;
706 tx.exec_batch(
707 "INSERT INTO mysql.tmp (id) VALUES (?)",
708 vec![(1_u8,), (2_u8,)],
709 )
710 .await?;
711 tx.exec_drop("SELECT * FROM mysql.tmp", ()).await?;
712 drop(tx);
713 let row_opt = pool
714 .get_conn()
715 .await?
716 .query_first("SELECT COUNT(*) FROM mysql.tmp")
717 .await?;
718 assert_eq!(row_opt, Some((0u8,)));
719 pool.get_conn()
720 .await?
721 .query_drop("DROP TABLE mysql.tmp")
722 .await?;
723 pool.disconnect().await?;
724 Ok(())
725 }
726
727 #[tokio::test]
728 async fn should_check_inactive_connection_ttl() -> super::Result<()> {
729 const POOL_MIN: usize = 5;
730 const POOL_MAX: usize = 10;
731
732 const INACTIVE_CONNECTION_TTL: Duration = Duration::from_millis(500);
733 const TTL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
734
735 let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
736 let pool_opts = PoolOpts::default()
737 .with_constraints(constraints)
738 .with_inactive_connection_ttl(INACTIVE_CONNECTION_TTL)
739 .with_ttl_check_interval(TTL_CHECK_INTERVAL);
740
741 let pool = Pool::new(get_opts().pool_opts(pool_opts));
742 let pool_clone = pool.clone();
743 let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
744
745 let conns = try_join_all(conns).await?;
746
747 assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
748 drop(conns);
749
750 sleep(Duration::from_millis(100)).await;
752
753 assert_eq!(ex_field!(pool_clone, available).len(), POOL_MAX);
755
756 sleep(TTL_CHECK_INTERVAL).await;
758
759 sleep(Duration::from_millis(500)).await;
761
762 assert_eq!(ex_field!(pool_clone, available).len(), POOL_MIN);
764
765 Ok(())
766 }
767
768 #[tokio::test]
769 async fn aa_should_hold_bounds2() -> super::Result<()> {
770 use std::cmp::min;
771
772 const POOL_MIN: usize = 5;
773 const POOL_MAX: usize = 10;
774
775 let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
776 let pool_opts = PoolOpts::default().with_constraints(constraints);
777
778 let pool = Pool::new(get_opts().pool_opts(pool_opts));
779 let pool_clone = pool.clone();
780 let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
781
782 let mut conns = try_join_all(conns).await?;
783
784 assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
787
788 while !conns.is_empty() {
789 let _ = conns.pop();
791
792 sleep(Duration::from_millis(500)).await;
794
795 let dropped = POOL_MAX - conns.len();
806 let idle = min(dropped, POOL_MIN);
807 let expected = conns.len() + idle;
808
809 let have = ex_field!(pool_clone, exist);
811 assert_eq!(have, expected);
812 }
813
814 pool.disconnect().await?;
815 Ok(())
816 }
817
818 #[tokio::test]
819 async fn should_hold_bounds1() -> super::Result<()> {
820 let constraints = PoolConstraints::new(1, 2).unwrap();
821 let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints));
822 let pool = Pool::new(opts);
823 let pool_clone = pool.clone();
824
825 let (conn1, conn2) = try_join!(pool.get_conn(), pool.get_conn()).unwrap();
826
827 assert_eq!(conn_ex_field!(conn1, exist), 2);
828 assert_eq!(conn_ex_field!(conn1, available).len(), 0);
829
830 drop(conn1);
831 drop(conn2);
832 let conn1 = pool_clone.get_conn().await?;
835 assert_eq!(conn_ex_field!(conn1, available).len(), 0);
836
837 drop(conn1);
838
839 assert!(ex_field!(pool, available).len() <= 1);
842 pool.disconnect().await?;
843 Ok(())
844 }
845
846 #[tokio::test]
848 async fn should_hold_bounds_on_error() -> super::Result<()> {
849 let pool = Pool::new("mysql://255.255.255.255");
851
852 assert!(try_join!(pool.get_conn(), pool.get_conn()).is_err());
853 assert_eq!(ex_field!(pool, exist), 0);
854 Ok(())
855 }
856
857 #[tokio::test]
858 async fn zz_should_check_wait_timeout_on_get_conn() -> super::Result<()> {
859 let pool = Pool::new(get_opts());
860
861 let mut conn = pool.get_conn().await?;
862 let wait_timeout_orig: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
863 conn.query_drop("SET GLOBAL wait_timeout = 3").await?;
864 conn.disconnect().await?;
865
866 let mut conn = pool.get_conn().await?;
867 let wait_timeout: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
868 let id1: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
869 drop(conn);
870
871 assert_eq!(wait_timeout, Some(3));
872 assert_eq!(ex_field!(pool, exist), 1);
873
874 sleep(Duration::from_secs(6)).await;
875
876 let mut conn = pool.get_conn().await?;
877 let id2: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
878 assert_eq!(ex_field!(pool, exist), 1);
879 assert_ne!(id1, id2);
880
881 conn.exec_drop("SET GLOBAL wait_timeout = ?", (wait_timeout_orig,))
882 .await?;
883 drop(conn);
884
885 pool.disconnect().await?;
886
887 Ok(())
888 }
889
890 #[tokio::test]
891 async fn droptest() -> super::Result<()> {
892 let pool = Pool::new(get_opts());
893 let conns = try_join_all((0..10).map(|_| pool.get_conn()))
894 .await
895 .unwrap();
896 drop(conns);
897 drop(pool);
898
899 let pool = Pool::new(get_opts());
900 let conns = try_join_all((0..10).map(|_| pool.get_conn()))
901 .await
902 .unwrap();
903 drop(pool);
904 drop(conns);
905 Ok(())
906 }
907
908 #[test]
909 fn drop_impl_for_conn_should_not_panic_within_unwind() {
910 use tokio::runtime;
911
912 const PANIC_MESSAGE: &str = "ORIGINAL_PANIC";
913
914 let result = std::panic::catch_unwind(|| {
915 runtime::Builder::new_current_thread()
916 .enable_all()
917 .build()
918 .unwrap()
919 .block_on(async {
920 let pool = Pool::new(get_opts());
921 let _conn = pool.get_conn().await.unwrap();
922 std::panic::panic_any(PANIC_MESSAGE);
923 });
924 });
925
926 assert_eq!(
927 *result.unwrap_err().downcast::<&str>().unwrap(),
928 "ORIGINAL_PANIC",
929 );
930 }
931
932 #[test]
933 fn should_not_panic_on_unclean_shutdown() {
934 for _ in 0..10 {
936 let rt = tokio::runtime::Runtime::new().unwrap();
937 let (tx, rx) = tokio::sync::oneshot::channel();
938 rt.block_on(async move {
939 let pool = Pool::new(get_opts());
940 let mut c = pool.get_conn().await.unwrap();
941 tokio::spawn(async move {
942 let _ = rx.await;
943 let _ = c.query_drop("SELECT 1").await;
944 });
945 });
946 drop(rt);
947 let _ = tx.send(());
949 }
950 }
951
952 #[test]
953 fn should_perform_clean_shutdown() {
954 for _ in 0..10 {
956 let rt = tokio::runtime::Runtime::new().unwrap();
957 let (tx, rx) = tokio::sync::oneshot::channel();
958 let jh = rt.spawn(async move {
959 let pool = Pool::new(get_opts());
960 let mut c = pool.get_conn().await.unwrap();
961 tokio::spawn(async move {
962 let _ = rx.await;
963 let _ = c.query_drop("SELECT 1").await;
964 });
965 let _ = pool.disconnect().await;
966 });
967 let _ = tx.send(());
968 rt.block_on(jh).unwrap();
969 }
970 }
971
972 #[tokio::test]
973 async fn issue_126_should_cleanup_errors_in_multiresult_sets() -> super::Result<()> {
974 let pool_constraints = PoolConstraints::new(0, 1).unwrap();
975 let pool_opts = PoolOpts::default().with_constraints(pool_constraints);
976
977 let pool = Pool::new(get_opts().pool_opts(pool_opts));
978
979 for _ in 0u8..100 {
980 pool.get_conn()
981 .await?
982 .query_iter("DO '42'; BLABLA;")
983 .await?;
984 }
985
986 Ok(())
987 }
988
989 #[tokio::test]
990 async fn should_ignore_non_fatal_errors_while_returning_to_a_pool() -> super::Result<()> {
991 let pool = pool_with_one_connection();
992 let id = pool.get_conn().await?.id();
993
994 for _ in 0u8..10 {
996 let mut conn = pool.get_conn().await?;
997 conn.query_iter("DO '42'; BLABLA;").await?;
998 assert_eq!(id, conn.id());
999 }
1000
1001 Ok(())
1002 }
1003
1004 #[tokio::test]
1005 async fn should_remove_waker_of_cancelled_task() {
1006 let pool = pool_with_one_connection();
1007 let only_conn = pool.get_conn().await.unwrap();
1008
1009 let join_handle = tokio::spawn(timeout(Duration::from_secs(1), pool.get_conn()));
1010
1011 sleep(Duration::from_secs(2)).await;
1012
1013 match join_handle.await.unwrap() {
1014 Err(_elapsed) => (),
1015 _ => panic!("unexpected Ok()"),
1016 }
1017 drop(only_conn);
1018
1019 assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len());
1020 assert_eq!(
1022 0,
1023 pool.metrics()
1024 .active_wait_requests
1025 .load(std::sync::atomic::Ordering::Relaxed)
1026 );
1027 }
1028
1029 #[tokio::test]
1030 async fn should_work_if_pooled_connection_operation_is_cancelled() -> super::Result<()> {
1031 let pool = Pool::new(get_opts());
1032
1033 join_all((0..10).map(|_| pool.get_conn())).await;
1035
1036 async fn op(pool: &Pool) {
1038 let _: Option<Row> = pool
1039 .get_conn()
1040 .await
1041 .unwrap()
1042 .exec_first("SELECT ?, ?", (42, "foo"))
1043 .await
1044 .unwrap();
1045 }
1046
1047 let mut max_delay = 0_u128;
1049 for _ in 0..10_usize {
1050 let start = std::time::Instant::now();
1051 op(&pool).await;
1052 max_delay = std::cmp::max(max_delay, start.elapsed().as_micros());
1053 }
1054
1055 for _ in 0_usize..128 {
1056 let fut = select_all((0_usize..5).map(|_| op(&pool).boxed()));
1057
1058 let delay_micros = rand::random::<u128>() % max_delay;
1061 select(
1062 sleep(Duration::from_micros(delay_micros as u64)).boxed(),
1063 fut,
1064 )
1065 .await;
1066
1067 sleep(Duration::from_millis(100)).await;
1069 }
1070 Ok(())
1071 }
1072
1073 #[test]
1074 fn waitlist_integrity() {
1075 const DATA: *const () = &();
1076 const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
1077 const NOOP_FN: unsafe fn(*const ()) = |_| {};
1078 static RW_VTABLE: RawWakerVTable =
1079 RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
1080 let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
1081
1082 let mut waitlist = Waitlist::default();
1083 assert_eq!(0, waitlist.queue.len());
1084
1085 waitlist.push(w.clone(), QueueId(Reverse(4)));
1086 waitlist.push(w.clone(), QueueId(Reverse(2)));
1087 waitlist.push(w.clone(), QueueId(Reverse(8)));
1088 waitlist.push(w.clone(), QUEUE_END_ID);
1089 waitlist.push(w.clone(), QueueId(Reverse(10)));
1090
1091 waitlist.remove(QueueId(Reverse(8)));
1092
1093 assert_eq!(4, waitlist.queue.len());
1094
1095 let (_, id) = waitlist.queue.pop().unwrap();
1096 assert_eq!(2, id.0 .0);
1097 let (_, id) = waitlist.queue.pop().unwrap();
1098 assert_eq!(4, id.0 .0);
1099 let (_, id) = waitlist.queue.pop().unwrap();
1100 assert_eq!(10, id.0 .0);
1101 let (_, id) = waitlist.queue.pop().unwrap();
1102 assert_eq!(QUEUE_END_ID, id);
1103
1104 assert_eq!(0, waitlist.queue.len());
1105 }
1106
1107 #[tokio::test]
1108 async fn check_absolute_connection_ttl() -> super::Result<()> {
1109 let constraints = PoolConstraints::new(1, 3).unwrap();
1110 let pool_opts = PoolOpts::default()
1111 .with_constraints(constraints)
1112 .with_inactive_connection_ttl(Duration::from_secs(99))
1113 .with_ttl_check_interval(Duration::from_secs(1))
1114 .with_abs_conn_ttl(Some(Duration::from_secs(2)));
1115
1116 let pool = Pool::new(get_opts().pool_opts(pool_opts));
1117
1118 let conn_ttl0 = pool.get_conn().await?;
1119 sleep(Duration::from_millis(1000)).await;
1120 let conn_ttl1 = pool.get_conn().await?;
1121 sleep(Duration::from_millis(1000)).await;
1122 let conn_ttl2 = pool.get_conn().await?;
1123
1124 drop(conn_ttl0);
1125 drop(conn_ttl1);
1126 drop(conn_ttl2);
1127 assert_eq!(ex_field!(pool, exist), 3);
1128
1129 sleep(Duration::from_millis(1500)).await;
1130 assert_eq!(ex_field!(pool, exist), 2);
1131
1132 sleep(Duration::from_millis(1000)).await;
1133 assert_eq!(ex_field!(pool, exist), 1);
1134
1135 sleep(Duration::from_millis(1000)).await;
1137 assert_eq!(ex_field!(pool, exist), 0);
1138
1139 Ok(())
1140 }
1141
1142 #[tokio::test]
1143 async fn save_last_waker() {
1144 let pool = pool_with_one_connection();
1147
1148 let conn = pool.get_conn().await.unwrap();
1151 let mut pending_fut = pin!(pool.get_conn());
1152
1153 let build_waker = || {
1154 let called = Arc::new(OnceLock::new());
1155 let called2 = called.clone();
1156 let waker = waker_fn(move || called2.set(()).unwrap());
1157 (called, waker)
1158 };
1159
1160 let mut assert_pending = |waker| {
1161 let mut context = Context::from_waker(&waker);
1162 let p = pending_fut.as_mut().poll(&mut context);
1163 assert!(matches!(p, Poll::Pending));
1164 };
1165
1166 let (first_called, waker) = build_waker();
1167 assert_pending(waker);
1168
1169 let (second_called, waker) = build_waker();
1170 assert_pending(waker);
1171
1172 drop(conn);
1173
1174 while second_called.get().is_none() {
1175 assert!(first_called.get().is_none());
1176 tokio::time::sleep(Duration::from_millis(100)).await;
1177 }
1178
1179 assert!(first_called.get().is_none());
1180 }
1181
1182 #[tokio::test]
1183 async fn check_priorities() -> super::Result<()> {
1184 let pool = pool_with_one_connection();
1185
1186 let queue_len = || {
1187 let exchange = pool.inner.exchange.lock().unwrap();
1188 exchange.waiting.queue.len()
1189 };
1190
1191 let conn = pool.get_conn().await.unwrap();
1194
1195 #[allow(clippy::async_yields_async)]
1196 let get_pending = || async {
1197 let fut = async {
1198 pool.get_conn().await.unwrap();
1199 }
1200 .shared();
1201 let p = poll!(fut.clone());
1202 assert!(matches!(p, Poll::Pending));
1203 fut
1204 };
1205
1206 let fut1 = get_pending().await;
1207 let fut2 = get_pending().await;
1208
1209 assert_eq!(queue_len(), 2);
1211
1212 drop(conn); while queue_len() != 1 {
1214 tokio::time::sleep(Duration::from_millis(100)).await;
1215 }
1216
1217 let Either::Right((_, fut2)) = select(fut2, fut1).await else {
1220 panic!("wrong future");
1221 };
1222
1223 assert_eq!(queue_len(), 1);
1226
1227 let p = poll!(fut2.clone());
1228 assert!(matches!(p, Poll::Pending));
1229 assert_eq!(queue_len(), 1); fut2.await;
1234 assert_eq!(queue_len(), 0);
1235
1236 let fut3 = get_pending().await;
1239 assert_eq!(queue_len(), 1);
1240
1241 fut3.await;
1243
1244 Ok(())
1245 }
1246
1247 #[cfg(feature = "nightly")]
1248 mod bench {
1249 use futures_util::future::{FutureExt, TryFutureExt};
1250 use tokio::runtime::Runtime;
1251
1252 use crate::{prelude::Queryable, test_misc::get_opts, Pool, PoolConstraints, PoolOpts};
1253 use std::time::Duration;
1254
1255 #[bench]
1256 fn get_conn(bencher: &mut test::Bencher) {
1257 let mut runtime = Runtime::new().unwrap();
1258 let pool = Pool::new(get_opts());
1259
1260 bencher.iter(|| {
1261 let fut = pool
1262 .get_conn()
1263 .and_then(|mut conn| async { conn.ping().await.map(|_| conn) });
1264 runtime.block_on(fut).unwrap();
1265 });
1266
1267 runtime.block_on(pool.disconnect()).unwrap();
1268 }
1269
1270 #[bench]
1271 fn new_conn_on_pool_soft_boundary(bencher: &mut test::Bencher) {
1272 let mut runtime = Runtime::new().unwrap();
1273
1274 let pool_constraints = PoolConstraints::new(0, 1).unwrap();
1275 let pool_opts = PoolOpts::default()
1276 .with_constraints(pool_constraints)
1277 .with_inactive_connection_ttl(Duration::from_secs(1));
1278
1279 let pool = Pool::new(get_opts().pool_opts(pool_opts));
1280
1281 bencher.iter(|| {
1282 let fut = pool.get_conn().map(drop);
1283 runtime.block_on(fut);
1284 });
1285
1286 runtime.block_on(pool.disconnect()).unwrap();
1287 }
1288 }
1289}