1use futures_util::FutureExt;
10use keyed_priority_queue::KeyedPriorityQueue;
11use tokio::sync::mpsc;
12
13use std::{
14 borrow::Borrow,
15 cmp::Reverse,
16 collections::VecDeque,
17 hash::{Hash, Hasher},
18 str::FromStr,
19 sync::{atomic, Arc, Mutex},
20 task::{Context, Poll, Waker},
21 time::{Duration, Instant},
22};
23
24use crate::{
25 conn::{pool::futures::*, Conn},
26 error::*,
27 opts::{Opts, PoolOpts},
28 queryable::transaction::{Transaction, TxOpts},
29};
30
31pub use metrics::Metrics;
32
33mod recycler;
34pub mod futures;
36mod metrics;
37mod ttl_check_inerval;
38
39#[derive(Debug)]
41struct IdlingConn {
42 since: Instant,
44 conn: Conn,
46}
47
48impl IdlingConn {
49 fn expired(&self) -> bool {
51 self.conn
52 .inner
53 .ttl_deadline
54 .map(|t| Instant::now() > t)
55 .unwrap_or_default()
56 }
57
58 fn elapsed(&self) -> Duration {
60 self.since.elapsed()
61 }
62}
63
64impl From<Conn> for IdlingConn {
65 fn from(conn: Conn) -> Self {
66 Self {
67 since: Instant::now(),
68 conn,
69 }
70 }
71}
72
73#[derive(Debug)]
78struct Exchange {
79 waiting: Waitlist,
80 available: VecDeque<IdlingConn>,
81 exist: usize,
82 recycler: Option<(mpsc::UnboundedReceiver<Option<Conn>>, PoolOpts)>,
84}
85
86impl Exchange {
87 fn spawn_futures_if_needed(&mut self, inner: &Arc<Inner>) {
90 use recycler::Recycler;
91 use ttl_check_inerval::TtlCheckInterval;
92 if let Some((dropped, pool_opts)) = self.recycler.take() {
93 tokio::spawn(Recycler::new(pool_opts.clone(), inner.clone(), dropped));
95
96 if pool_opts.inactive_connection_ttl() > Duration::ZERO
99 || pool_opts.abs_conn_ttl().is_some()
100 {
101 tokio::spawn(TtlCheckInterval::new(pool_opts, inner.clone()));
102 }
103 }
104 }
105}
106
107#[derive(Default, Debug)]
108struct Waitlist {
109 queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
110}
111
112impl Waitlist {
113 fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
114 self.remove(queue_id);
126 self.queue
127 .push(QueuedWaker { queue_id, waker }, queue_id)
128 .is_none()
129 }
130
131 fn pop(&mut self) -> Option<Waker> {
132 match self.queue.pop() {
133 Some((qw, _)) => Some(qw.waker),
134 None => None,
135 }
136 }
137
138 fn remove(&mut self, id: QueueId) -> bool {
139 self.queue.remove(&id).is_some()
140 }
141
142 fn peek_id(&mut self) -> Option<QueueId> {
143 self.queue.peek().map(|(qw, _)| qw.queue_id)
144 }
145}
146
147const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
148
149#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
150pub(crate) struct QueueId(Reverse<u64>);
151
152impl QueueId {
153 fn next() -> Self {
154 static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
155 let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
156 QueueId(Reverse(id))
157 }
158}
159
160#[derive(Debug)]
161struct QueuedWaker {
162 queue_id: QueueId,
163 waker: Waker,
164}
165
166impl Eq for QueuedWaker {}
167
168impl Borrow<QueueId> for QueuedWaker {
169 fn borrow(&self) -> &QueueId {
170 &self.queue_id
171 }
172}
173
174impl PartialEq for QueuedWaker {
175 fn eq(&self, other: &Self) -> bool {
176 self.queue_id == other.queue_id
177 }
178}
179
180impl Hash for QueuedWaker {
181 fn hash<H: Hasher>(&self, state: &mut H) {
182 self.queue_id.hash(state)
183 }
184}
185
186#[derive(Debug)]
188pub struct Inner {
189 metrics: Arc<Metrics>,
190 close: atomic::AtomicBool,
191 closed: atomic::AtomicBool,
192 exchange: Mutex<Exchange>,
193}
194
195#[derive(Debug, Clone)]
204pub struct Pool {
205 opts: Opts,
206 inner: Arc<Inner>,
207 drop: mpsc::UnboundedSender<Option<Conn>>,
208}
209
210impl Pool {
211 pub fn new<O>(opts: O) -> Pool
217 where
218 Opts: TryFrom<O>,
219 <Opts as TryFrom<O>>::Error: std::error::Error,
220 {
221 let opts = Opts::try_from(opts).unwrap();
222 let pool_opts = opts.pool_opts().clone();
223 let (tx, rx) = mpsc::unbounded_channel();
224 Pool {
225 opts,
226 inner: Arc::new(Inner {
227 close: false.into(),
228 closed: false.into(),
229 metrics: Arc::new(Metrics::default()),
230 exchange: Mutex::new(Exchange {
231 available: VecDeque::with_capacity(pool_opts.constraints().max()),
232 waiting: Waitlist::default(),
233 exist: 0,
234 recycler: Some((rx, pool_opts)),
235 }),
236 }),
237 drop: tx,
238 }
239 }
240
241 pub fn metrics(&self) -> Arc<Metrics> {
243 self.inner.metrics.clone()
244 }
245
246 pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
248 let opts = Opts::from_str(url.as_ref())?;
249 Ok(Pool::new(opts))
250 }
251
252 pub fn get_conn(&self) -> GetConn {
254 let reset_connection = self.opts.pool_opts().reset_connection();
255 GetConn::new(self, reset_connection)
256 }
257
258 pub async fn start_transaction(&self, options: TxOpts) -> Result<Transaction<'static>> {
260 let conn = self.get_conn().await?;
261 Transaction::new(conn, options).await
262 }
263
264 pub fn disconnect(self) -> DisconnectPool {
269 DisconnectPool::new(self)
270 }
271
272 fn return_conn(&mut self, conn: Conn) {
274 self.send_to_recycler(conn);
277 }
278
279 fn send_to_recycler(&self, conn: Conn) {
280 if let Err(conn) = self.drop.send(Some(conn)) {
281 let conn = conn.0.unwrap();
282
283 if !self.inner.closed.load(atomic::Ordering::SeqCst) {
286 assert!(conn.inner.pool.is_none());
289 drop(conn);
290 } else {
291 unreachable!("Recycler exited while connections still exist");
292 }
293 }
294 }
295
296 pub(super) fn cancel_connection(&self) {
301 let mut exchange = self.inner.exchange.lock().unwrap();
302 exchange.exist -= 1;
303 self.inner
304 .metrics
305 .create_failed
306 .fetch_add(1, atomic::Ordering::Relaxed);
307 if let Some(w) = exchange.waiting.pop() {
309 w.wake();
310 }
311 }
312
313 fn poll_new_conn(
315 &mut self,
316 cx: &mut Context<'_>,
317 queue_id: QueueId,
318 ) -> Poll<Result<GetConnInner>> {
319 let mut exchange = self.inner.exchange.lock().unwrap();
320
321 if self.inner.close.load(atomic::Ordering::Acquire) {
325 return Err(Error::Driver(DriverError::PoolDisconnected)).into();
326 }
327
328 exchange.spawn_futures_if_needed(&self.inner);
329
330 let highest = if let Some(cur) = exchange.waiting.peek_id() {
332 queue_id > cur
333 } else {
334 true
335 };
336
337 if !highest {
339 if exchange.waiting.push(cx.waker().clone(), queue_id) {
340 self.inner
341 .metrics
342 .active_wait_requests
343 .fetch_add(1, atomic::Ordering::Relaxed);
344 }
345 return Poll::Pending;
346 }
347
348 #[allow(unused_variables)] while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() {
350 self.inner
351 .metrics
352 .connections_in_pool
353 .fetch_sub(1, atomic::Ordering::Relaxed);
354
355 if !conn.expired() {
356 #[cfg(feature = "hdrhistogram")]
357 self.inner
358 .metrics
359 .connection_idle_duration
360 .lock()
361 .unwrap()
362 .saturating_record(since.elapsed().as_micros() as u64);
363 #[cfg(feature = "hdrhistogram")]
364 let metrics = self.metrics();
365 conn.inner.active_since = Instant::now();
366 return Poll::Ready(Ok(GetConnInner::Checking(
367 async move {
368 conn.stream_mut()?.check().await?;
369 #[cfg(feature = "hdrhistogram")]
370 metrics
371 .check_duration
372 .lock()
373 .unwrap()
374 .saturating_record(
375 conn.inner.active_since.elapsed().as_micros() as u64
376 );
377 Ok(conn)
378 }
379 .boxed(),
380 )));
381 } else {
382 self.send_to_recycler(conn);
383 }
384 }
385
386 if exchange.exist < self.opts.pool_opts().constraints().max() {
389 exchange.exist += 1;
391
392 self.inner
393 .metrics
394 .connection_count
395 .fetch_add(1, atomic::Ordering::Relaxed);
396
397 let opts = self.opts.clone();
398 #[cfg(feature = "hdrhistogram")]
399 let metrics = self.metrics();
400
401 return Poll::Ready(Ok(GetConnInner::Connecting(
402 async move {
403 let conn = Conn::new(opts).await;
404 #[cfg(feature = "hdrhistogram")]
405 if let Ok(conn) = &conn {
406 metrics
407 .connect_duration
408 .lock()
409 .unwrap()
410 .saturating_record(
411 conn.inner.active_since.elapsed().as_micros() as u64
412 );
413 }
414 conn
415 }
416 .boxed(),
417 )));
418 }
419
420 if exchange.waiting.push(cx.waker().clone(), queue_id) {
422 self.inner
423 .metrics
424 .active_wait_requests
425 .fetch_add(1, atomic::Ordering::Relaxed);
426 }
427 Poll::Pending
428 }
429
430 fn unqueue(&self, queue_id: QueueId) {
431 let mut exchange = self.inner.exchange.lock().unwrap();
432 if exchange.waiting.remove(queue_id) {
433 self.inner
434 .metrics
435 .active_wait_requests
436 .fetch_sub(1, atomic::Ordering::Relaxed);
437 }
438 }
439}
440
441impl Drop for Conn {
442 fn drop(&mut self) {
443 self.inner.infile_handler = None;
444
445 if std::thread::panicking() {
446 if let Some(pool) = self.inner.pool.take() {
448 pool.cancel_connection();
449 }
450
451 return;
452 }
453
454 if let Some(mut pool) = self.inner.pool.take() {
455 pool.return_conn(self.take());
456 } else if self.inner.stream.is_some() && !self.inner.disconnected {
457 crate::conn::disconnect(self.take());
458 }
459 }
460}
461
462#[cfg(test)]
463mod test {
464 use futures_util::{
465 future::{join_all, select, select_all, try_join_all, Either},
466 poll, try_join, FutureExt,
467 };
468 use tokio::time::{sleep, timeout};
469 use waker_fn::waker_fn;
470
471 use std::{
472 cmp::Reverse,
473 future::Future,
474 pin::pin,
475 sync::{Arc, OnceLock},
476 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
477 time::Duration,
478 };
479
480 use crate::{
481 conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID},
482 opts::PoolOpts,
483 prelude::*,
484 test_misc::get_opts,
485 PoolConstraints, Row, TxOpts, Value,
486 };
487
488 macro_rules! conn_ex_field {
489 ($conn:expr, $field:tt) => {
490 ex_field!($conn.inner.pool.as_ref().unwrap(), $field)
491 };
492 }
493
494 macro_rules! ex_field {
495 ($pool:expr, $field:tt) => {
496 $pool.inner.exchange.lock().unwrap().$field
497 };
498 }
499
500 fn pool_with_one_connection() -> Pool {
501 let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
502 let opts = get_opts().pool_opts(pool_opts.clone());
503 Pool::new(opts)
504 }
505
506 #[tokio::test]
507 async fn should_opt_out_of_connection_reset() -> super::Result<()> {
508 let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
509 let opts = get_opts().pool_opts(pool_opts.clone());
510
511 let pool = Pool::new(opts.clone());
512
513 let mut conn = pool.get_conn().await.unwrap();
514 assert_eq!(
515 conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
516 Value::NULL
517 );
518 conn.query_drop("SET @foo = 'foo'").await?;
519 assert_eq!(
520 conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
521 "foo",
522 );
523 drop(conn);
524
525 conn = pool.get_conn().await.unwrap();
526 assert_eq!(
527 conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
528 Value::NULL
529 );
530 conn.query_drop("SET @foo = 'foo'").await?;
531 conn.reset_connection(false);
532 drop(conn);
533
534 conn = pool.get_conn().await.unwrap();
535 assert_eq!(
536 conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
537 "foo",
538 );
539 drop(conn);
540 pool.disconnect().await.unwrap();
541
542 let pool = Pool::new(opts.pool_opts(pool_opts.with_reset_connection(false)));
543 conn = pool.get_conn().await.unwrap();
544 conn.query_drop("SET @foo = 'foo'").await?;
545 drop(conn);
546 conn = pool.get_conn().await.unwrap();
547 assert_eq!(
548 conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
549 "foo",
550 );
551 drop(conn);
552 pool.disconnect().await
553 }
554
555 #[test]
556 fn should_not_hang() -> super::Result<()> {
557 pub struct Database {
558 pool: Pool,
559 }
560
561 impl Database {
562 pub async fn disconnect(self) -> super::Result<()> {
563 self.pool.disconnect().await?;
564 Ok(())
565 }
566 }
567
568 let runtime = tokio::runtime::Runtime::new().unwrap();
569 let database = Database {
570 pool: Pool::new(get_opts()),
571 };
572 runtime.block_on(database.disconnect())
573 }
574
575 #[tokio::test]
576 async fn should_track_conn_if_disconnected_outside_of_a_pool() -> super::Result<()> {
577 let pool = Pool::new(get_opts());
578 let conn = pool.get_conn().await?;
579 conn.disconnect().await?;
580 pool.disconnect().await?;
581 Ok(())
582 }
583
584 #[tokio::test]
585 async fn should_connect() -> super::Result<()> {
586 let pool = Pool::new(crate::Opts::from(get_opts()));
587 pool.get_conn().await?.ping().await?;
588 pool.disconnect().await?;
589 Ok(())
590 }
591
592 #[tokio::test]
593 async fn should_reconnect() -> super::Result<()> {
594 let mut master = crate::Conn::new(get_opts()).await?;
595
596 async fn test(master: &mut crate::Conn, opts: crate::OptsBuilder) -> super::Result<()> {
597 const NUM_CONNS: usize = 5;
598 let pool = Pool::new(opts);
599
600 let connections = (0..NUM_CONNS).map(|_| {
602 async {
603 let mut conn = pool.get_conn().await?;
604 conn.ping().await?;
605 crate::Result::Ok(conn)
606 }
607 .boxed()
608 });
609
610 let ids = try_join_all(connections)
612 .await?
613 .into_iter()
614 .map(|conn| conn.id())
615 .collect::<Vec<_>>();
616
617 sleep(Duration::from_millis(1000)).await;
619
620 pool.get_conn().await?;
622
623 for id in ids {
625 master.query_drop(format!("KILL {}", id)).await?;
626 }
627
628 assert_eq!(ex_field!(pool, available).len(), NUM_CONNS);
630
631 sleep(Duration::from_millis(500)).await;
632
633 let _conn = pool.get_conn().await?;
635
636 assert_eq!(ex_field!(pool, available).len(), 0);
638
639 drop(_conn);
640 pool.disconnect().await
641 }
642
643 println!("Check socket/pipe..");
644 test(&mut master, get_opts()).await?;
645
646 println!("Check tcp..");
647 test(&mut master, get_opts().prefer_socket(false)).await?;
648
649 master.disconnect().await
650 }
651
652 #[tokio::test]
653 async fn should_reuse_connections() -> super::Result<()> {
654 let pool = pool_with_one_connection();
655 let mut conn = pool.get_conn().await?;
656
657 let server_version = conn.server_version();
658 let connection_id = conn.id();
659
660 for _ in 0..16 {
661 drop(conn);
662 conn = pool.get_conn().await?;
663 println!("CONN connection_id={}", conn.id());
664 assert!(conn.id() == connection_id || server_version < (5, 7, 2));
665 }
666
667 Ok(())
668 }
669
670 #[tokio::test]
671 #[ignore]
672 async fn can_handle_the_pressure() {
673 let pool = Pool::new(get_opts());
674 for _ in 0..10i32 {
675 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
676 for i in 0..10_000 {
677 let pool = pool.clone();
678 let tx = tx.clone();
679 tokio::spawn(async move {
680 let _ = pool.get_conn().await.unwrap();
681 tx.send(i).unwrap();
682 });
683 }
684 drop(tx);
685 while (rx.recv().await).is_some() {}
687 }
688 drop(pool);
689 }
690
691 #[tokio::test]
692 async fn should_start_transaction() -> super::Result<()> {
693 let pool = pool_with_one_connection();
694
695 "CREATE TABLE IF NOT EXISTS mysql.tmp(id int)"
696 .ignore(&pool)
697 .await?;
698 "DELETE FROM mysql.tmp".ignore(&pool).await?;
699
700 let mut tx = pool.start_transaction(TxOpts::default()).await?;
701 tx.exec_batch(
702 "INSERT INTO mysql.tmp (id) VALUES (?)",
703 vec![(1_u8,), (2_u8,)],
704 )
705 .await?;
706 tx.exec_drop("SELECT * FROM mysql.tmp", ()).await?;
707 drop(tx);
708 let row_opt = pool
709 .get_conn()
710 .await?
711 .query_first("SELECT COUNT(*) FROM mysql.tmp")
712 .await?;
713 assert_eq!(row_opt, Some((0u8,)));
714 pool.get_conn()
715 .await?
716 .query_drop("DROP TABLE mysql.tmp")
717 .await?;
718 pool.disconnect().await?;
719 Ok(())
720 }
721
722 #[tokio::test]
723 async fn should_check_inactive_connection_ttl() -> super::Result<()> {
724 const POOL_MIN: usize = 5;
725 const POOL_MAX: usize = 10;
726
727 const INACTIVE_CONNECTION_TTL: Duration = Duration::from_millis(500);
728 const TTL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
729
730 let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
731 let pool_opts = PoolOpts::default()
732 .with_constraints(constraints)
733 .with_inactive_connection_ttl(INACTIVE_CONNECTION_TTL)
734 .with_ttl_check_interval(TTL_CHECK_INTERVAL);
735
736 let pool = Pool::new(get_opts().pool_opts(pool_opts));
737 let pool_clone = pool.clone();
738 let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
739
740 let conns = try_join_all(conns).await?;
741
742 assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
743 drop(conns);
744
745 sleep(Duration::from_millis(100)).await;
747
748 assert_eq!(ex_field!(pool_clone, available).len(), POOL_MAX);
750
751 sleep(TTL_CHECK_INTERVAL).await;
753
754 sleep(Duration::from_millis(500)).await;
756
757 assert_eq!(ex_field!(pool_clone, available).len(), POOL_MIN);
759
760 Ok(())
761 }
762
763 #[tokio::test]
764 async fn aa_should_hold_bounds2() -> super::Result<()> {
765 use std::cmp::min;
766
767 const POOL_MIN: usize = 5;
768 const POOL_MAX: usize = 10;
769
770 let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
771 let pool_opts = PoolOpts::default().with_constraints(constraints);
772
773 let pool = Pool::new(get_opts().pool_opts(pool_opts));
774 let pool_clone = pool.clone();
775 let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
776
777 let mut conns = try_join_all(conns).await?;
778
779 assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
782
783 while !conns.is_empty() {
784 let _ = conns.pop();
786
787 sleep(Duration::from_millis(500)).await;
789
790 let dropped = POOL_MAX - conns.len();
801 let idle = min(dropped, POOL_MIN);
802 let expected = conns.len() + idle;
803
804 let have = ex_field!(pool_clone, exist);
806 assert_eq!(have, expected);
807 }
808
809 pool.disconnect().await?;
810 Ok(())
811 }
812
813 #[tokio::test]
814 async fn should_hold_bounds1() -> super::Result<()> {
815 let constraints = PoolConstraints::new(1, 2).unwrap();
816 let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints));
817 let pool = Pool::new(opts);
818 let pool_clone = pool.clone();
819
820 let (conn1, conn2) = try_join!(pool.get_conn(), pool.get_conn()).unwrap();
821
822 assert_eq!(conn_ex_field!(conn1, exist), 2);
823 assert_eq!(conn_ex_field!(conn1, available).len(), 0);
824
825 drop(conn1);
826 drop(conn2);
827 let conn1 = pool_clone.get_conn().await?;
830 assert_eq!(conn_ex_field!(conn1, available).len(), 0);
831
832 drop(conn1);
833
834 assert!(ex_field!(pool, available).len() <= 1);
837 pool.disconnect().await?;
838 Ok(())
839 }
840
841 #[tokio::test]
843 async fn should_hold_bounds_on_error() -> super::Result<()> {
844 let pool = Pool::new("mysql://255.255.255.255");
846
847 assert!(try_join!(pool.get_conn(), pool.get_conn()).is_err());
848 assert_eq!(ex_field!(pool, exist), 0);
849 Ok(())
850 }
851
852 #[tokio::test]
853 async fn zz_should_check_wait_timeout_on_get_conn() -> super::Result<()> {
854 let pool = Pool::new(get_opts());
855
856 let mut conn = pool.get_conn().await?;
857 let wait_timeout_orig: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
858 conn.query_drop("SET GLOBAL wait_timeout = 3").await?;
859 conn.disconnect().await?;
860
861 let mut conn = pool.get_conn().await?;
862 let wait_timeout: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
863 let id1: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
864 drop(conn);
865
866 assert_eq!(wait_timeout, Some(3));
867 assert_eq!(ex_field!(pool, exist), 1);
868
869 sleep(Duration::from_secs(6)).await;
870
871 let mut conn = pool.get_conn().await?;
872 let id2: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
873 assert_eq!(ex_field!(pool, exist), 1);
874 assert_ne!(id1, id2);
875
876 conn.exec_drop("SET GLOBAL wait_timeout = ?", (wait_timeout_orig,))
877 .await?;
878 drop(conn);
879
880 pool.disconnect().await?;
881
882 Ok(())
883 }
884
885 #[tokio::test]
886 async fn droptest() -> super::Result<()> {
887 let pool = Pool::new(get_opts());
888 let conns = try_join_all((0..10).map(|_| pool.get_conn()))
889 .await
890 .unwrap();
891 drop(conns);
892 drop(pool);
893
894 let pool = Pool::new(get_opts());
895 let conns = try_join_all((0..10).map(|_| pool.get_conn()))
896 .await
897 .unwrap();
898 drop(pool);
899 drop(conns);
900 Ok(())
901 }
902
903 #[test]
904 fn drop_impl_for_conn_should_not_panic_within_unwind() {
905 use tokio::runtime;
906
907 const PANIC_MESSAGE: &str = "ORIGINAL_PANIC";
908
909 let result = std::panic::catch_unwind(|| {
910 runtime::Builder::new_current_thread()
911 .enable_all()
912 .build()
913 .unwrap()
914 .block_on(async {
915 let pool = Pool::new(get_opts());
916 let _conn = pool.get_conn().await.unwrap();
917 std::panic::panic_any(PANIC_MESSAGE);
918 });
919 });
920
921 assert_eq!(
922 *result.unwrap_err().downcast::<&str>().unwrap(),
923 "ORIGINAL_PANIC",
924 );
925 }
926
927 #[test]
928 fn should_not_panic_on_unclean_shutdown() {
929 for _ in 0..10 {
931 let rt = tokio::runtime::Runtime::new().unwrap();
932 let (tx, rx) = tokio::sync::oneshot::channel();
933 rt.block_on(async move {
934 let pool = Pool::new(get_opts());
935 let mut c = pool.get_conn().await.unwrap();
936 tokio::spawn(async move {
937 let _ = rx.await;
938 let _ = c.query_drop("SELECT 1").await;
939 });
940 });
941 drop(rt);
942 let _ = tx.send(());
944 }
945 }
946
947 #[test]
948 fn should_perform_clean_shutdown() {
949 for _ in 0..10 {
951 let rt = tokio::runtime::Runtime::new().unwrap();
952 let (tx, rx) = tokio::sync::oneshot::channel();
953 let jh = rt.spawn(async move {
954 let pool = Pool::new(get_opts());
955 let mut c = pool.get_conn().await.unwrap();
956 tokio::spawn(async move {
957 let _ = rx.await;
958 let _ = c.query_drop("SELECT 1").await;
959 });
960 let _ = pool.disconnect().await;
961 });
962 let _ = tx.send(());
963 rt.block_on(jh).unwrap();
964 }
965 }
966
967 #[tokio::test]
968 async fn issue_126_should_cleanup_errors_in_multiresult_sets() -> super::Result<()> {
969 let pool_constraints = PoolConstraints::new(0, 1).unwrap();
970 let pool_opts = PoolOpts::default().with_constraints(pool_constraints);
971
972 let pool = Pool::new(get_opts().pool_opts(pool_opts));
973
974 for _ in 0u8..100 {
975 pool.get_conn()
976 .await?
977 .query_iter("DO '42'; BLABLA;")
978 .await?;
979 }
980
981 Ok(())
982 }
983
984 #[tokio::test]
985 async fn should_ignore_non_fatal_errors_while_returning_to_a_pool() -> super::Result<()> {
986 let pool = pool_with_one_connection();
987 let id = pool.get_conn().await?.id();
988
989 for _ in 0u8..10 {
991 let mut conn = pool.get_conn().await?;
992 conn.query_iter("DO '42'; BLABLA;").await?;
993 assert_eq!(id, conn.id());
994 }
995
996 Ok(())
997 }
998
999 #[tokio::test]
1000 async fn should_remove_waker_of_cancelled_task() {
1001 let pool = pool_with_one_connection();
1002 let only_conn = pool.get_conn().await.unwrap();
1003
1004 let join_handle = tokio::spawn(timeout(Duration::from_secs(1), pool.get_conn()));
1005
1006 sleep(Duration::from_secs(2)).await;
1007
1008 match join_handle.await.unwrap() {
1009 Err(_elapsed) => (),
1010 _ => panic!("unexpected Ok()"),
1011 }
1012 drop(only_conn);
1013
1014 assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len());
1015 }
1016
1017 #[tokio::test]
1018 async fn should_work_if_pooled_connection_operation_is_cancelled() -> super::Result<()> {
1019 let pool = Pool::new(get_opts());
1020
1021 join_all((0..10).map(|_| pool.get_conn())).await;
1023
1024 async fn op(pool: &Pool) {
1026 let _: Option<Row> = pool
1027 .get_conn()
1028 .await
1029 .unwrap()
1030 .exec_first("SELECT ?, ?", (42, "foo"))
1031 .await
1032 .unwrap();
1033 }
1034
1035 let mut max_delay = 0_u128;
1037 for _ in 0..10_usize {
1038 let start = std::time::Instant::now();
1039 op(&pool).await;
1040 max_delay = std::cmp::max(max_delay, start.elapsed().as_micros());
1041 }
1042
1043 for _ in 0_usize..128 {
1044 let fut = select_all((0_usize..5).map(|_| op(&pool).boxed()));
1045
1046 let delay_micros = rand::random::<u128>() % max_delay;
1049 select(
1050 sleep(Duration::from_micros(delay_micros as u64)).boxed(),
1051 fut,
1052 )
1053 .await;
1054
1055 sleep(Duration::from_millis(100)).await;
1057 }
1058 Ok(())
1059 }
1060
1061 #[test]
1062 fn waitlist_integrity() {
1063 const DATA: *const () = &();
1064 const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
1065 const NOOP_FN: unsafe fn(*const ()) = |_| {};
1066 static RW_VTABLE: RawWakerVTable =
1067 RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
1068 let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
1069
1070 let mut waitlist = Waitlist::default();
1071 assert_eq!(0, waitlist.queue.len());
1072
1073 waitlist.push(w.clone(), QueueId(Reverse(4)));
1074 waitlist.push(w.clone(), QueueId(Reverse(2)));
1075 waitlist.push(w.clone(), QueueId(Reverse(8)));
1076 waitlist.push(w.clone(), QUEUE_END_ID);
1077 waitlist.push(w.clone(), QueueId(Reverse(10)));
1078
1079 waitlist.remove(QueueId(Reverse(8)));
1080
1081 assert_eq!(4, waitlist.queue.len());
1082
1083 let (_, id) = waitlist.queue.pop().unwrap();
1084 assert_eq!(2, id.0 .0);
1085 let (_, id) = waitlist.queue.pop().unwrap();
1086 assert_eq!(4, id.0 .0);
1087 let (_, id) = waitlist.queue.pop().unwrap();
1088 assert_eq!(10, id.0 .0);
1089 let (_, id) = waitlist.queue.pop().unwrap();
1090 assert_eq!(QUEUE_END_ID, id);
1091
1092 assert_eq!(0, waitlist.queue.len());
1093 }
1094
1095 #[tokio::test]
1096 async fn check_absolute_connection_ttl() -> super::Result<()> {
1097 let constraints = PoolConstraints::new(1, 3).unwrap();
1098 let pool_opts = PoolOpts::default()
1099 .with_constraints(constraints)
1100 .with_inactive_connection_ttl(Duration::from_secs(99))
1101 .with_ttl_check_interval(Duration::from_secs(1))
1102 .with_abs_conn_ttl(Some(Duration::from_secs(2)));
1103
1104 let pool = Pool::new(get_opts().pool_opts(pool_opts));
1105
1106 let conn_ttl0 = pool.get_conn().await?;
1107 sleep(Duration::from_millis(1000)).await;
1108 let conn_ttl1 = pool.get_conn().await?;
1109 sleep(Duration::from_millis(1000)).await;
1110 let conn_ttl2 = pool.get_conn().await?;
1111
1112 drop(conn_ttl0);
1113 drop(conn_ttl1);
1114 drop(conn_ttl2);
1115 assert_eq!(ex_field!(pool, exist), 3);
1116
1117 sleep(Duration::from_millis(1500)).await;
1118 assert_eq!(ex_field!(pool, exist), 2);
1119
1120 sleep(Duration::from_millis(1000)).await;
1121 assert_eq!(ex_field!(pool, exist), 1);
1122
1123 sleep(Duration::from_millis(1000)).await;
1125 assert_eq!(ex_field!(pool, exist), 0);
1126
1127 Ok(())
1128 }
1129
1130 #[tokio::test]
1131 async fn save_last_waker() {
1132 let pool = pool_with_one_connection();
1135
1136 let conn = pool.get_conn().await.unwrap();
1139 let mut pending_fut = pin!(pool.get_conn());
1140
1141 let build_waker = || {
1142 let called = Arc::new(OnceLock::new());
1143 let called2 = called.clone();
1144 let waker = waker_fn(move || called2.set(()).unwrap());
1145 (called, waker)
1146 };
1147
1148 let mut assert_pending = |waker| {
1149 let mut context = Context::from_waker(&waker);
1150 let p = pending_fut.as_mut().poll(&mut context);
1151 assert!(matches!(p, Poll::Pending));
1152 };
1153
1154 let (first_called, waker) = build_waker();
1155 assert_pending(waker);
1156
1157 let (second_called, waker) = build_waker();
1158 assert_pending(waker);
1159
1160 drop(conn);
1161
1162 while second_called.get().is_none() {
1163 assert!(first_called.get().is_none());
1164 tokio::time::sleep(Duration::from_millis(100)).await;
1165 }
1166
1167 assert!(first_called.get().is_none());
1168 }
1169
1170 #[tokio::test]
1171 async fn check_priorities() -> super::Result<()> {
1172 let pool = pool_with_one_connection();
1173
1174 let queue_len = || {
1175 let exchange = pool.inner.exchange.lock().unwrap();
1176 exchange.waiting.queue.len()
1177 };
1178
1179 let conn = pool.get_conn().await.unwrap();
1182
1183 #[allow(clippy::async_yields_async)]
1184 let get_pending = || async {
1185 let fut = async {
1186 pool.get_conn().await.unwrap();
1187 }
1188 .shared();
1189 let p = poll!(fut.clone());
1190 assert!(matches!(p, Poll::Pending));
1191 fut
1192 };
1193
1194 let fut1 = get_pending().await;
1195 let fut2 = get_pending().await;
1196
1197 assert_eq!(queue_len(), 2);
1199
1200 drop(conn); while queue_len() != 1 {
1202 tokio::time::sleep(Duration::from_millis(100)).await;
1203 }
1204
1205 let Either::Right((_, fut2)) = select(fut2, fut1).await else {
1208 panic!("wrong future");
1209 };
1210
1211 assert_eq!(queue_len(), 1);
1214
1215 let p = poll!(fut2.clone());
1216 assert!(matches!(p, Poll::Pending));
1217 assert_eq!(queue_len(), 1); fut2.await;
1222 assert_eq!(queue_len(), 0);
1223
1224 let fut3 = get_pending().await;
1227 assert_eq!(queue_len(), 1);
1228
1229 fut3.await;
1231
1232 Ok(())
1233 }
1234
1235 #[cfg(feature = "nightly")]
1236 mod bench {
1237 use futures_util::future::{FutureExt, TryFutureExt};
1238 use tokio::runtime::Runtime;
1239
1240 use crate::{prelude::Queryable, test_misc::get_opts, Pool, PoolConstraints, PoolOpts};
1241 use std::time::Duration;
1242
1243 #[bench]
1244 fn get_conn(bencher: &mut test::Bencher) {
1245 let mut runtime = Runtime::new().unwrap();
1246 let pool = Pool::new(get_opts());
1247
1248 bencher.iter(|| {
1249 let fut = pool
1250 .get_conn()
1251 .and_then(|mut conn| async { conn.ping().await.map(|_| conn) });
1252 runtime.block_on(fut).unwrap();
1253 });
1254
1255 runtime.block_on(pool.disconnect()).unwrap();
1256 }
1257
1258 #[bench]
1259 fn new_conn_on_pool_soft_boundary(bencher: &mut test::Bencher) {
1260 let mut runtime = Runtime::new().unwrap();
1261
1262 let pool_constraints = PoolConstraints::new(0, 1).unwrap();
1263 let pool_opts = PoolOpts::default()
1264 .with_constraints(pool_constraints)
1265 .with_inactive_connection_ttl(Duration::from_secs(1));
1266
1267 let pool = Pool::new(get_opts().pool_opts(pool_opts));
1268
1269 bencher.iter(|| {
1270 let fut = pool.get_conn().map(drop);
1271 runtime.block_on(fut);
1272 });
1273
1274 runtime.block_on(pool.disconnect()).unwrap();
1275 }
1276 }
1277}