mysql_async/conn/pool/
mod.rs

1// Copyright (c) 2016 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_util::FutureExt;
10use keyed_priority_queue::KeyedPriorityQueue;
11use tokio::sync::mpsc;
12
13use std::{
14    borrow::Borrow,
15    cmp::Reverse,
16    collections::VecDeque,
17    hash::{Hash, Hasher},
18    str::FromStr,
19    sync::{atomic, Arc, Mutex},
20    task::{Context, Poll, Waker},
21    time::{Duration, Instant},
22};
23
24use crate::{
25    conn::{pool::futures::*, Conn},
26    error::*,
27    opts::{Opts, PoolOpts},
28    queryable::transaction::{Transaction, TxOpts},
29};
30
31pub use metrics::Metrics;
32
33mod recycler;
34// this is a really unfortunate name for a module
35pub mod futures;
36mod metrics;
37mod ttl_check_inerval;
38
39/// Connection that is idling in the pool.
40#[derive(Debug)]
41struct IdlingConn {
42    /// The connection is idling since this `Instant`.
43    since: Instant,
44    /// Idling connection.
45    conn: Conn,
46}
47
48impl IdlingConn {
49    /// Returns true when this connection has a TTL and it elapsed.
50    fn expired(&self) -> bool {
51        self.conn
52            .inner
53            .ttl_deadline
54            .map(|t| Instant::now() > t)
55            .unwrap_or_default()
56    }
57
58    /// Returns duration elapsed since this connection is idling.
59    fn elapsed(&self) -> Duration {
60        self.since.elapsed()
61    }
62}
63
64impl From<Conn> for IdlingConn {
65    fn from(conn: Conn) -> Self {
66        Self {
67            since: Instant::now(),
68            conn,
69        }
70    }
71}
72
73/// The exchange is where we track all connections as they come and go.
74///
75/// It is held under a single, non-asynchronous lock.
76/// This is fine as long as we never do expensive work while holding the lock!
77#[derive(Debug)]
78struct Exchange {
79    waiting: Waitlist,
80    available: VecDeque<IdlingConn>,
81    exist: usize,
82    // only used to spawn the recycler the first time we're in async context
83    recycler: Option<(mpsc::UnboundedReceiver<Option<Conn>>, PoolOpts)>,
84}
85
86impl Exchange {
87    /// This function will spawn the recycler for this pool
88    /// as well as the ttl check interval if `inactive_connection_ttl` isn't `0`.
89    fn spawn_futures_if_needed(&mut self, inner: &Arc<Inner>) {
90        use recycler::Recycler;
91        use ttl_check_inerval::TtlCheckInterval;
92        if let Some((dropped, pool_opts)) = self.recycler.take() {
93            // Spawn the Recycler.
94            tokio::spawn(Recycler::new(pool_opts.clone(), inner.clone(), dropped));
95
96            // Spawn the ttl check interval if `inactive_connection_ttl` isn't `0` or
97            // connections have an absolute TTL.
98            if pool_opts.inactive_connection_ttl() > Duration::ZERO
99                || pool_opts.abs_conn_ttl().is_some()
100            {
101                tokio::spawn(TtlCheckInterval::new(pool_opts, inner.clone()));
102            }
103        }
104    }
105}
106
107#[derive(Default, Debug)]
108struct Waitlist {
109    queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
110}
111
112impl Waitlist {
113    fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
114        // The documentation of Future::poll says:
115        //   Note that on multiple calls to poll, only the Waker from
116        //   the Context passed to the most recent call should be
117        //   scheduled to receive a wakeup.
118        //
119        // But the the documentation of KeyedPriorityQueue::push says:
120        //   Adds new element to queue if missing key or replace its
121        //   priority if key exists. In second case doesn’t replace key.
122        //
123        // This means we have to remove first to have the most recent
124        // waker in the queue.
125        self.remove(queue_id);
126        self.queue
127            .push(QueuedWaker { queue_id, waker }, queue_id)
128            .is_none()
129    }
130
131    fn pop(&mut self) -> Option<Waker> {
132        match self.queue.pop() {
133            Some((qw, _)) => Some(qw.waker),
134            None => None,
135        }
136    }
137
138    fn remove(&mut self, id: QueueId) -> bool {
139        self.queue.remove(&id).is_some()
140    }
141
142    fn peek_id(&mut self) -> Option<QueueId> {
143        self.queue.peek().map(|(qw, _)| qw.queue_id)
144    }
145}
146
147const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
148
149#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
150pub(crate) struct QueueId(Reverse<u64>);
151
152impl QueueId {
153    fn next() -> Self {
154        static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
155        let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
156        QueueId(Reverse(id))
157    }
158}
159
160#[derive(Debug)]
161struct QueuedWaker {
162    queue_id: QueueId,
163    waker: Waker,
164}
165
166impl Eq for QueuedWaker {}
167
168impl Borrow<QueueId> for QueuedWaker {
169    fn borrow(&self) -> &QueueId {
170        &self.queue_id
171    }
172}
173
174impl PartialEq for QueuedWaker {
175    fn eq(&self, other: &Self) -> bool {
176        self.queue_id == other.queue_id
177    }
178}
179
180impl Hash for QueuedWaker {
181    fn hash<H: Hasher>(&self, state: &mut H) {
182        self.queue_id.hash(state)
183    }
184}
185
186/// Connection pool data.
187#[derive(Debug)]
188pub struct Inner {
189    metrics: Arc<Metrics>,
190    close: atomic::AtomicBool,
191    closed: atomic::AtomicBool,
192    exchange: Mutex<Exchange>,
193}
194
195/// Asynchronous pool of MySql connections.
196///
197/// Actually `Pool` is a shared reference, i.e. every clone will lead to the same instance
198/// created with [`Pool::new`]. Also `Pool` satisfies `Send` and `Sync`, so you don't have to wrap
199/// it into an `Arc` or `Mutex`.
200///
201/// Note that you will probably want to await [`Pool::disconnect`] before dropping the runtime, as
202/// otherwise you may end up with a number of connections that are not cleanly terminated.
203#[derive(Debug, Clone)]
204pub struct Pool {
205    opts: Opts,
206    inner: Arc<Inner>,
207    drop: mpsc::UnboundedSender<Option<Conn>>,
208}
209
210impl Pool {
211    /// Creates a new pool of connections.
212    ///
213    /// # Panic
214    ///
215    /// It'll panic if `Opts::try_from(opts)` returns error.
216    pub fn new<O>(opts: O) -> Pool
217    where
218        Opts: TryFrom<O>,
219        <Opts as TryFrom<O>>::Error: std::error::Error,
220    {
221        let opts = Opts::try_from(opts).unwrap();
222        let pool_opts = opts.pool_opts().clone();
223        let (tx, rx) = mpsc::unbounded_channel();
224        Pool {
225            opts,
226            inner: Arc::new(Inner {
227                close: false.into(),
228                closed: false.into(),
229                metrics: Arc::new(Metrics::default()),
230                exchange: Mutex::new(Exchange {
231                    available: VecDeque::with_capacity(pool_opts.constraints().max()),
232                    waiting: Waitlist::default(),
233                    exist: 0,
234                    recycler: Some((rx, pool_opts)),
235                }),
236            }),
237            drop: tx,
238        }
239    }
240
241    /// Returns metrics for the connection pool.
242    pub fn metrics(&self) -> Arc<Metrics> {
243        self.inner.metrics.clone()
244    }
245
246    /// Creates a new pool of connections.
247    pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
248        let opts = Opts::from_str(url.as_ref())?;
249        Ok(Pool::new(opts))
250    }
251
252    /// Async function that resolves to `Conn`.
253    pub fn get_conn(&self) -> GetConn {
254        let reset_connection = self.opts.pool_opts().reset_connection();
255        GetConn::new(self, reset_connection)
256    }
257
258    /// Starts a new transaction.
259    pub async fn start_transaction(&self, options: TxOpts) -> Result<Transaction<'static>> {
260        let conn = self.get_conn().await?;
261        Transaction::new(conn, options).await
262    }
263
264    /// Async function that disconnects this pool from the server and resolves to `()`.
265    ///
266    /// **Note:** This Future won't resolve until all active connections, taken from it,
267    /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
268    pub fn disconnect(self) -> DisconnectPool {
269        DisconnectPool::new(self)
270    }
271
272    /// A way to return connection taken from a pool.
273    fn return_conn(&mut self, conn: Conn) {
274        // NOTE: we're not in async context here, so we can't block or return NotReady
275        // any and all cleanup work _has_ to be done in the spawned recycler
276        self.send_to_recycler(conn);
277    }
278
279    fn send_to_recycler(&self, conn: Conn) {
280        if let Err(conn) = self.drop.send(Some(conn)) {
281            let conn = conn.0.unwrap();
282
283            // This _probably_ means that the Runtime is shutting down, and that the Recycler was
284            // dropped rather than allowed to exit cleanly.
285            if !self.inner.closed.load(atomic::Ordering::SeqCst) {
286                // Yup, Recycler was forcibly dropped!
287                // All we can do here is try the non-pool drop path for Conn.
288                assert!(conn.inner.pool.is_none());
289                drop(conn);
290            } else {
291                unreachable!("Recycler exited while connections still exist");
292            }
293        }
294    }
295
296    /// Indicate that a connection failed to be created and release it.
297    ///
298    /// Decreases the exist counter since a broken or dropped connection should not count towards
299    /// the total.
300    pub(super) fn cancel_connection(&self) {
301        let mut exchange = self.inner.exchange.lock().unwrap();
302        exchange.exist -= 1;
303        self.inner
304            .metrics
305            .create_failed
306            .fetch_add(1, atomic::Ordering::Relaxed);
307        // we just enabled the creation of a new connection!
308        if let Some(w) = exchange.waiting.pop() {
309            w.wake();
310        }
311    }
312
313    /// Poll the pool for an available connection.
314    fn poll_new_conn(
315        &mut self,
316        cx: &mut Context<'_>,
317        queue_id: QueueId,
318    ) -> Poll<Result<GetConnInner>> {
319        let mut exchange = self.inner.exchange.lock().unwrap();
320
321        // NOTE: this load must happen while we hold the lock,
322        // otherwise the recycler may choose to exit, see that .exist == 0, and then exit,
323        // and then we decide to create a new connection, which would then never be torn down.
324        if self.inner.close.load(atomic::Ordering::Acquire) {
325            return Err(Error::Driver(DriverError::PoolDisconnected)).into();
326        }
327
328        exchange.spawn_futures_if_needed(&self.inner);
329
330        // Check if we are higher priority than anything current
331        let highest = if let Some(cur) = exchange.waiting.peek_id() {
332            queue_id > cur
333        } else {
334            true
335        };
336
337        // If we are not, just queue
338        if !highest {
339            if exchange.waiting.push(cx.waker().clone(), queue_id) {
340                self.inner
341                    .metrics
342                    .active_wait_requests
343                    .fetch_add(1, atomic::Ordering::Relaxed);
344            }
345            return Poll::Pending;
346        }
347
348        #[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled
349        while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() {
350            self.inner
351                .metrics
352                .connections_in_pool
353                .fetch_sub(1, atomic::Ordering::Relaxed);
354
355            if !conn.expired() {
356                #[cfg(feature = "hdrhistogram")]
357                self.inner
358                    .metrics
359                    .connection_idle_duration
360                    .lock()
361                    .unwrap()
362                    .saturating_record(since.elapsed().as_micros() as u64);
363                #[cfg(feature = "hdrhistogram")]
364                let metrics = self.metrics();
365                conn.inner.active_since = Instant::now();
366                return Poll::Ready(Ok(GetConnInner::Checking(
367                    async move {
368                        conn.stream_mut()?.check().await?;
369                        #[cfg(feature = "hdrhistogram")]
370                        metrics
371                            .check_duration
372                            .lock()
373                            .unwrap()
374                            .saturating_record(
375                                conn.inner.active_since.elapsed().as_micros() as u64
376                            );
377                        Ok(conn)
378                    }
379                    .boxed(),
380                )));
381            } else {
382                self.send_to_recycler(conn);
383            }
384        }
385
386        // we didn't _immediately_ get one -- try to make one
387        // we first try to just do a load so we don't do an unnecessary add then sub
388        if exchange.exist < self.opts.pool_opts().constraints().max() {
389            // we are allowed to make a new connection, so we will!
390            exchange.exist += 1;
391
392            self.inner
393                .metrics
394                .connection_count
395                .fetch_add(1, atomic::Ordering::Relaxed);
396
397            let opts = self.opts.clone();
398            #[cfg(feature = "hdrhistogram")]
399            let metrics = self.metrics();
400
401            return Poll::Ready(Ok(GetConnInner::Connecting(
402                async move {
403                    let conn = Conn::new(opts).await;
404                    #[cfg(feature = "hdrhistogram")]
405                    if let Ok(conn) = &conn {
406                        metrics
407                            .connect_duration
408                            .lock()
409                            .unwrap()
410                            .saturating_record(
411                                conn.inner.active_since.elapsed().as_micros() as u64
412                            );
413                    }
414                    conn
415                }
416                .boxed(),
417            )));
418        }
419
420        // Polled, but no conn available? Back into the queue.
421        if exchange.waiting.push(cx.waker().clone(), queue_id) {
422            self.inner
423                .metrics
424                .active_wait_requests
425                .fetch_add(1, atomic::Ordering::Relaxed);
426        }
427        Poll::Pending
428    }
429
430    fn unqueue(&self, queue_id: QueueId) {
431        let mut exchange = self.inner.exchange.lock().unwrap();
432        if exchange.waiting.remove(queue_id) {
433            self.inner
434                .metrics
435                .active_wait_requests
436                .fetch_sub(1, atomic::Ordering::Relaxed);
437        }
438    }
439}
440
441impl Drop for Conn {
442    fn drop(&mut self) {
443        self.inner.infile_handler = None;
444
445        if std::thread::panicking() {
446            // Try to decrease the number of existing connections.
447            if let Some(pool) = self.inner.pool.take() {
448                pool.cancel_connection();
449            }
450
451            return;
452        }
453
454        if let Some(mut pool) = self.inner.pool.take() {
455            pool.return_conn(self.take());
456        } else if self.inner.stream.is_some() && !self.inner.disconnected {
457            crate::conn::disconnect(self.take());
458        }
459    }
460}
461
462#[cfg(test)]
463mod test {
464    use futures_util::{
465        future::{join_all, select, select_all, try_join_all, Either},
466        poll, try_join, FutureExt,
467    };
468    use tokio::time::{sleep, timeout};
469    use waker_fn::waker_fn;
470
471    use std::{
472        cmp::Reverse,
473        future::Future,
474        pin::pin,
475        sync::{Arc, OnceLock},
476        task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
477        time::Duration,
478    };
479
480    use crate::{
481        conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID},
482        opts::PoolOpts,
483        prelude::*,
484        test_misc::get_opts,
485        PoolConstraints, Row, TxOpts, Value,
486    };
487
488    macro_rules! conn_ex_field {
489        ($conn:expr, $field:tt) => {
490            ex_field!($conn.inner.pool.as_ref().unwrap(), $field)
491        };
492    }
493
494    macro_rules! ex_field {
495        ($pool:expr, $field:tt) => {
496            $pool.inner.exchange.lock().unwrap().$field
497        };
498    }
499
500    fn pool_with_one_connection() -> Pool {
501        let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
502        let opts = get_opts().pool_opts(pool_opts.clone());
503        Pool::new(opts)
504    }
505
506    #[tokio::test]
507    async fn should_opt_out_of_connection_reset() -> super::Result<()> {
508        let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
509        let opts = get_opts().pool_opts(pool_opts.clone());
510
511        let pool = Pool::new(opts.clone());
512
513        let mut conn = pool.get_conn().await.unwrap();
514        assert_eq!(
515            conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
516            Value::NULL
517        );
518        conn.query_drop("SET @foo = 'foo'").await?;
519        assert_eq!(
520            conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
521            "foo",
522        );
523        drop(conn);
524
525        conn = pool.get_conn().await.unwrap();
526        assert_eq!(
527            conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
528            Value::NULL
529        );
530        conn.query_drop("SET @foo = 'foo'").await?;
531        conn.reset_connection(false);
532        drop(conn);
533
534        conn = pool.get_conn().await.unwrap();
535        assert_eq!(
536            conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
537            "foo",
538        );
539        drop(conn);
540        pool.disconnect().await.unwrap();
541
542        let pool = Pool::new(opts.pool_opts(pool_opts.with_reset_connection(false)));
543        conn = pool.get_conn().await.unwrap();
544        conn.query_drop("SET @foo = 'foo'").await?;
545        drop(conn);
546        conn = pool.get_conn().await.unwrap();
547        assert_eq!(
548            conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
549            "foo",
550        );
551        drop(conn);
552        pool.disconnect().await
553    }
554
555    #[test]
556    fn should_not_hang() -> super::Result<()> {
557        pub struct Database {
558            pool: Pool,
559        }
560
561        impl Database {
562            pub async fn disconnect(self) -> super::Result<()> {
563                self.pool.disconnect().await?;
564                Ok(())
565            }
566        }
567
568        let runtime = tokio::runtime::Runtime::new().unwrap();
569        let database = Database {
570            pool: Pool::new(get_opts()),
571        };
572        runtime.block_on(database.disconnect())
573    }
574
575    #[tokio::test]
576    async fn should_track_conn_if_disconnected_outside_of_a_pool() -> super::Result<()> {
577        let pool = Pool::new(get_opts());
578        let conn = pool.get_conn().await?;
579        conn.disconnect().await?;
580        pool.disconnect().await?;
581        Ok(())
582    }
583
584    #[tokio::test]
585    async fn should_connect() -> super::Result<()> {
586        let pool = Pool::new(crate::Opts::from(get_opts()));
587        pool.get_conn().await?.ping().await?;
588        pool.disconnect().await?;
589        Ok(())
590    }
591
592    #[tokio::test]
593    async fn should_reconnect() -> super::Result<()> {
594        let mut master = crate::Conn::new(get_opts()).await?;
595
596        async fn test(master: &mut crate::Conn, opts: crate::OptsBuilder) -> super::Result<()> {
597            const NUM_CONNS: usize = 5;
598            let pool = Pool::new(opts);
599
600            // create some conns..
601            let connections = (0..NUM_CONNS).map(|_| {
602                async {
603                    let mut conn = pool.get_conn().await?;
604                    conn.ping().await?;
605                    crate::Result::Ok(conn)
606                }
607                .boxed()
608            });
609
610            // collect ids..
611            let ids = try_join_all(connections)
612                .await?
613                .into_iter()
614                .map(|conn| conn.id())
615                .collect::<Vec<_>>();
616
617            // give some time to reset connections
618            sleep(Duration::from_millis(1000)).await;
619
620            // get_conn should work if connection is available and alive
621            pool.get_conn().await?;
622
623            // now we'll kill connections..
624            for id in ids {
625                master.query_drop(format!("KILL {}", id)).await?;
626            }
627
628            // now check, that they're still in the pool..
629            assert_eq!(ex_field!(pool, available).len(), NUM_CONNS);
630
631            sleep(Duration::from_millis(500)).await;
632
633            // now get new connection..
634            let _conn = pool.get_conn().await?;
635
636            // now check, that broken connections are dropped
637            assert_eq!(ex_field!(pool, available).len(), 0);
638
639            drop(_conn);
640            pool.disconnect().await
641        }
642
643        println!("Check socket/pipe..");
644        test(&mut master, get_opts()).await?;
645
646        println!("Check tcp..");
647        test(&mut master, get_opts().prefer_socket(false)).await?;
648
649        master.disconnect().await
650    }
651
652    #[tokio::test]
653    async fn should_reuse_connections() -> super::Result<()> {
654        let pool = pool_with_one_connection();
655        let mut conn = pool.get_conn().await?;
656
657        let server_version = conn.server_version();
658        let connection_id = conn.id();
659
660        for _ in 0..16 {
661            drop(conn);
662            conn = pool.get_conn().await?;
663            println!("CONN connection_id={}", conn.id());
664            assert!(conn.id() == connection_id || server_version < (5, 7, 2));
665        }
666
667        Ok(())
668    }
669
670    #[tokio::test]
671    #[ignore]
672    async fn can_handle_the_pressure() {
673        let pool = Pool::new(get_opts());
674        for _ in 0..10i32 {
675            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
676            for i in 0..10_000 {
677                let pool = pool.clone();
678                let tx = tx.clone();
679                tokio::spawn(async move {
680                    let _ = pool.get_conn().await.unwrap();
681                    tx.send(i).unwrap();
682                });
683            }
684            drop(tx);
685            // see that all the tx's eventually complete
686            while (rx.recv().await).is_some() {}
687        }
688        drop(pool);
689    }
690
691    #[tokio::test]
692    async fn should_start_transaction() -> super::Result<()> {
693        let pool = pool_with_one_connection();
694
695        "CREATE TABLE IF NOT EXISTS mysql.tmp(id int)"
696            .ignore(&pool)
697            .await?;
698        "DELETE FROM mysql.tmp".ignore(&pool).await?;
699
700        let mut tx = pool.start_transaction(TxOpts::default()).await?;
701        tx.exec_batch(
702            "INSERT INTO mysql.tmp (id) VALUES (?)",
703            vec![(1_u8,), (2_u8,)],
704        )
705        .await?;
706        tx.exec_drop("SELECT * FROM mysql.tmp", ()).await?;
707        drop(tx);
708        let row_opt = pool
709            .get_conn()
710            .await?
711            .query_first("SELECT COUNT(*) FROM mysql.tmp")
712            .await?;
713        assert_eq!(row_opt, Some((0u8,)));
714        pool.get_conn()
715            .await?
716            .query_drop("DROP TABLE mysql.tmp")
717            .await?;
718        pool.disconnect().await?;
719        Ok(())
720    }
721
722    #[tokio::test]
723    async fn should_check_inactive_connection_ttl() -> super::Result<()> {
724        const POOL_MIN: usize = 5;
725        const POOL_MAX: usize = 10;
726
727        const INACTIVE_CONNECTION_TTL: Duration = Duration::from_millis(500);
728        const TTL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
729
730        let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
731        let pool_opts = PoolOpts::default()
732            .with_constraints(constraints)
733            .with_inactive_connection_ttl(INACTIVE_CONNECTION_TTL)
734            .with_ttl_check_interval(TTL_CHECK_INTERVAL);
735
736        let pool = Pool::new(get_opts().pool_opts(pool_opts));
737        let pool_clone = pool.clone();
738        let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
739
740        let conns = try_join_all(conns).await?;
741
742        assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
743        drop(conns);
744
745        // wait for a bit to let the connections be reclaimed
746        sleep(Duration::from_millis(100)).await;
747
748        // check that connections are still in the pool because of inactive_connection_ttl
749        assert_eq!(ex_field!(pool_clone, available).len(), POOL_MAX);
750
751        // then, wait for ttl_check_interval
752        sleep(TTL_CHECK_INTERVAL).await;
753
754        // wait a bit more to let the connections be reclaimed by the ttl check
755        sleep(Duration::from_millis(500)).await;
756
757        // check that we have the expected number of connections
758        assert_eq!(ex_field!(pool_clone, available).len(), POOL_MIN);
759
760        Ok(())
761    }
762
763    #[tokio::test]
764    async fn aa_should_hold_bounds2() -> super::Result<()> {
765        use std::cmp::min;
766
767        const POOL_MIN: usize = 5;
768        const POOL_MAX: usize = 10;
769
770        let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
771        let pool_opts = PoolOpts::default().with_constraints(constraints);
772
773        let pool = Pool::new(get_opts().pool_opts(pool_opts));
774        let pool_clone = pool.clone();
775        let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
776
777        let mut conns = try_join_all(conns).await?;
778
779        // we want to continuously drop connections
780        // and check that they are _actually_ dropped until we reach POOL_MIN
781        assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
782
783        while !conns.is_empty() {
784            // first, drop a connection
785            let _ = conns.pop();
786
787            // then, wait for a bit to let the connection be reclaimed
788            sleep(Duration::from_millis(500)).await;
789
790            // now check that we have the expected # of connections
791            // this may look a little funky, but think of it this way:
792            //
793            //  - if we hold all 10 connections, we expect 10
794            //  - if we drop one,  we still expect 10, because POOL_MIN limits
795            //    the number of _idle_ connections (of which there is only 1)
796            //  - once we've dropped 5, there are now 5 idle connections. thus,
797            //    if we drop one more, we _now_ expect there to be only 9
798            //    connections total (no more connections should be pushed to
799            //    idle).
800            let dropped = POOL_MAX - conns.len();
801            let idle = min(dropped, POOL_MIN);
802            let expected = conns.len() + idle;
803
804            // check that we have the expected number of connections
805            let have = ex_field!(pool_clone, exist);
806            assert_eq!(have, expected);
807        }
808
809        pool.disconnect().await?;
810        Ok(())
811    }
812
813    #[tokio::test]
814    async fn should_hold_bounds1() -> super::Result<()> {
815        let constraints = PoolConstraints::new(1, 2).unwrap();
816        let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints));
817        let pool = Pool::new(opts);
818        let pool_clone = pool.clone();
819
820        let (conn1, conn2) = try_join!(pool.get_conn(), pool.get_conn()).unwrap();
821
822        assert_eq!(conn_ex_field!(conn1, exist), 2);
823        assert_eq!(conn_ex_field!(conn1, available).len(), 0);
824
825        drop(conn1);
826        drop(conn2);
827        // only one of conn1 and conn2 should have gone to idle,
828        // and should have immediately been picked up by new_conn (now conn1)
829        let conn1 = pool_clone.get_conn().await?;
830        assert_eq!(conn_ex_field!(conn1, available).len(), 0);
831
832        drop(conn1);
833
834        // the connection should be returned to idle
835        // (but may not have been returned _yet_)
836        assert!(ex_field!(pool, available).len() <= 1);
837        pool.disconnect().await?;
838        Ok(())
839    }
840
841    // Test that connections which err do not count towards the connection count in the pool.
842    #[tokio::test]
843    async fn should_hold_bounds_on_error() -> super::Result<()> {
844        // Should not be possible to connect to broadcast address.
845        let pool = Pool::new("mysql://255.255.255.255");
846
847        assert!(try_join!(pool.get_conn(), pool.get_conn()).is_err());
848        assert_eq!(ex_field!(pool, exist), 0);
849        Ok(())
850    }
851
852    #[tokio::test]
853    async fn zz_should_check_wait_timeout_on_get_conn() -> super::Result<()> {
854        let pool = Pool::new(get_opts());
855
856        let mut conn = pool.get_conn().await?;
857        let wait_timeout_orig: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
858        conn.query_drop("SET GLOBAL wait_timeout = 3").await?;
859        conn.disconnect().await?;
860
861        let mut conn = pool.get_conn().await?;
862        let wait_timeout: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
863        let id1: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
864        drop(conn);
865
866        assert_eq!(wait_timeout, Some(3));
867        assert_eq!(ex_field!(pool, exist), 1);
868
869        sleep(Duration::from_secs(6)).await;
870
871        let mut conn = pool.get_conn().await?;
872        let id2: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
873        assert_eq!(ex_field!(pool, exist), 1);
874        assert_ne!(id1, id2);
875
876        conn.exec_drop("SET GLOBAL wait_timeout = ?", (wait_timeout_orig,))
877            .await?;
878        drop(conn);
879
880        pool.disconnect().await?;
881
882        Ok(())
883    }
884
885    #[tokio::test]
886    async fn droptest() -> super::Result<()> {
887        let pool = Pool::new(get_opts());
888        let conns = try_join_all((0..10).map(|_| pool.get_conn()))
889            .await
890            .unwrap();
891        drop(conns);
892        drop(pool);
893
894        let pool = Pool::new(get_opts());
895        let conns = try_join_all((0..10).map(|_| pool.get_conn()))
896            .await
897            .unwrap();
898        drop(pool);
899        drop(conns);
900        Ok(())
901    }
902
903    #[test]
904    fn drop_impl_for_conn_should_not_panic_within_unwind() {
905        use tokio::runtime;
906
907        const PANIC_MESSAGE: &str = "ORIGINAL_PANIC";
908
909        let result = std::panic::catch_unwind(|| {
910            runtime::Builder::new_current_thread()
911                .enable_all()
912                .build()
913                .unwrap()
914                .block_on(async {
915                    let pool = Pool::new(get_opts());
916                    let _conn = pool.get_conn().await.unwrap();
917                    std::panic::panic_any(PANIC_MESSAGE);
918                });
919        });
920
921        assert_eq!(
922            *result.unwrap_err().downcast::<&str>().unwrap(),
923            "ORIGINAL_PANIC",
924        );
925    }
926
927    #[test]
928    fn should_not_panic_on_unclean_shutdown() {
929        // run more than once to trigger different drop orders
930        for _ in 0..10 {
931            let rt = tokio::runtime::Runtime::new().unwrap();
932            let (tx, rx) = tokio::sync::oneshot::channel();
933            rt.block_on(async move {
934                let pool = Pool::new(get_opts());
935                let mut c = pool.get_conn().await.unwrap();
936                tokio::spawn(async move {
937                    let _ = rx.await;
938                    let _ = c.query_drop("SELECT 1").await;
939                });
940            });
941            drop(rt);
942            // c is still active here, so if anything it's been forcibly dropped
943            let _ = tx.send(());
944        }
945    }
946
947    #[test]
948    fn should_perform_clean_shutdown() {
949        // run more than once to trigger different drop orders
950        for _ in 0..10 {
951            let rt = tokio::runtime::Runtime::new().unwrap();
952            let (tx, rx) = tokio::sync::oneshot::channel();
953            let jh = rt.spawn(async move {
954                let pool = Pool::new(get_opts());
955                let mut c = pool.get_conn().await.unwrap();
956                tokio::spawn(async move {
957                    let _ = rx.await;
958                    let _ = c.query_drop("SELECT 1").await;
959                });
960                let _ = pool.disconnect().await;
961            });
962            let _ = tx.send(());
963            rt.block_on(jh).unwrap();
964        }
965    }
966
967    #[tokio::test]
968    async fn issue_126_should_cleanup_errors_in_multiresult_sets() -> super::Result<()> {
969        let pool_constraints = PoolConstraints::new(0, 1).unwrap();
970        let pool_opts = PoolOpts::default().with_constraints(pool_constraints);
971
972        let pool = Pool::new(get_opts().pool_opts(pool_opts));
973
974        for _ in 0u8..100 {
975            pool.get_conn()
976                .await?
977                .query_iter("DO '42'; BLABLA;")
978                .await?;
979        }
980
981        Ok(())
982    }
983
984    #[tokio::test]
985    async fn should_ignore_non_fatal_errors_while_returning_to_a_pool() -> super::Result<()> {
986        let pool = pool_with_one_connection();
987        let id = pool.get_conn().await?.id();
988
989        // non-fatal errors are ignored
990        for _ in 0u8..10 {
991            let mut conn = pool.get_conn().await?;
992            conn.query_iter("DO '42'; BLABLA;").await?;
993            assert_eq!(id, conn.id());
994        }
995
996        Ok(())
997    }
998
999    #[tokio::test]
1000    async fn should_remove_waker_of_cancelled_task() {
1001        let pool = pool_with_one_connection();
1002        let only_conn = pool.get_conn().await.unwrap();
1003
1004        let join_handle = tokio::spawn(timeout(Duration::from_secs(1), pool.get_conn()));
1005
1006        sleep(Duration::from_secs(2)).await;
1007
1008        match join_handle.await.unwrap() {
1009            Err(_elapsed) => (),
1010            _ => panic!("unexpected Ok()"),
1011        }
1012        drop(only_conn);
1013
1014        assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len());
1015    }
1016
1017    #[tokio::test]
1018    async fn should_work_if_pooled_connection_operation_is_cancelled() -> super::Result<()> {
1019        let pool = Pool::new(get_opts());
1020
1021        // warm up
1022        join_all((0..10).map(|_| pool.get_conn())).await;
1023
1024        /// some operation
1025        async fn op(pool: &Pool) {
1026            let _: Option<Row> = pool
1027                .get_conn()
1028                .await
1029                .unwrap()
1030                .exec_first("SELECT ?, ?", (42, "foo"))
1031                .await
1032                .unwrap();
1033        }
1034
1035        // Measure the delay
1036        let mut max_delay = 0_u128;
1037        for _ in 0..10_usize {
1038            let start = std::time::Instant::now();
1039            op(&pool).await;
1040            max_delay = std::cmp::max(max_delay, start.elapsed().as_micros());
1041        }
1042
1043        for _ in 0_usize..128 {
1044            let fut = select_all((0_usize..5).map(|_| op(&pool).boxed()));
1045
1046            // we need to cancel the op in the middle
1047            // this should not lead to the `packet out of order` error.
1048            let delay_micros = rand::random::<u128>() % max_delay;
1049            select(
1050                sleep(Duration::from_micros(delay_micros as u64)).boxed(),
1051                fut,
1052            )
1053            .await;
1054
1055            // give some time for connections to return to the pool
1056            sleep(Duration::from_millis(100)).await;
1057        }
1058        Ok(())
1059    }
1060
1061    #[test]
1062    fn waitlist_integrity() {
1063        const DATA: *const () = &();
1064        const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
1065        const NOOP_FN: unsafe fn(*const ()) = |_| {};
1066        static RW_VTABLE: RawWakerVTable =
1067            RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
1068        let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
1069
1070        let mut waitlist = Waitlist::default();
1071        assert_eq!(0, waitlist.queue.len());
1072
1073        waitlist.push(w.clone(), QueueId(Reverse(4)));
1074        waitlist.push(w.clone(), QueueId(Reverse(2)));
1075        waitlist.push(w.clone(), QueueId(Reverse(8)));
1076        waitlist.push(w.clone(), QUEUE_END_ID);
1077        waitlist.push(w.clone(), QueueId(Reverse(10)));
1078
1079        waitlist.remove(QueueId(Reverse(8)));
1080
1081        assert_eq!(4, waitlist.queue.len());
1082
1083        let (_, id) = waitlist.queue.pop().unwrap();
1084        assert_eq!(2, id.0 .0);
1085        let (_, id) = waitlist.queue.pop().unwrap();
1086        assert_eq!(4, id.0 .0);
1087        let (_, id) = waitlist.queue.pop().unwrap();
1088        assert_eq!(10, id.0 .0);
1089        let (_, id) = waitlist.queue.pop().unwrap();
1090        assert_eq!(QUEUE_END_ID, id);
1091
1092        assert_eq!(0, waitlist.queue.len());
1093    }
1094
1095    #[tokio::test]
1096    async fn check_absolute_connection_ttl() -> super::Result<()> {
1097        let constraints = PoolConstraints::new(1, 3).unwrap();
1098        let pool_opts = PoolOpts::default()
1099            .with_constraints(constraints)
1100            .with_inactive_connection_ttl(Duration::from_secs(99))
1101            .with_ttl_check_interval(Duration::from_secs(1))
1102            .with_abs_conn_ttl(Some(Duration::from_secs(2)));
1103
1104        let pool = Pool::new(get_opts().pool_opts(pool_opts));
1105
1106        let conn_ttl0 = pool.get_conn().await?;
1107        sleep(Duration::from_millis(1000)).await;
1108        let conn_ttl1 = pool.get_conn().await?;
1109        sleep(Duration::from_millis(1000)).await;
1110        let conn_ttl2 = pool.get_conn().await?;
1111
1112        drop(conn_ttl0);
1113        drop(conn_ttl1);
1114        drop(conn_ttl2);
1115        assert_eq!(ex_field!(pool, exist), 3);
1116
1117        sleep(Duration::from_millis(1500)).await;
1118        assert_eq!(ex_field!(pool, exist), 2);
1119
1120        sleep(Duration::from_millis(1000)).await;
1121        assert_eq!(ex_field!(pool, exist), 1);
1122
1123        // Go even below min pool size.
1124        sleep(Duration::from_millis(1000)).await;
1125        assert_eq!(ex_field!(pool, exist), 0);
1126
1127        Ok(())
1128    }
1129
1130    #[tokio::test]
1131    async fn save_last_waker() {
1132        // Test that if passed multiple wakers, we call the last one.
1133
1134        let pool = pool_with_one_connection();
1135
1136        // Get a connection, so we know the next future will be
1137        // queued.
1138        let conn = pool.get_conn().await.unwrap();
1139        let mut pending_fut = pin!(pool.get_conn());
1140
1141        let build_waker = || {
1142            let called = Arc::new(OnceLock::new());
1143            let called2 = called.clone();
1144            let waker = waker_fn(move || called2.set(()).unwrap());
1145            (called, waker)
1146        };
1147
1148        let mut assert_pending = |waker| {
1149            let mut context = Context::from_waker(&waker);
1150            let p = pending_fut.as_mut().poll(&mut context);
1151            assert!(matches!(p, Poll::Pending));
1152        };
1153
1154        let (first_called, waker) = build_waker();
1155        assert_pending(waker);
1156
1157        let (second_called, waker) = build_waker();
1158        assert_pending(waker);
1159
1160        drop(conn);
1161
1162        while second_called.get().is_none() {
1163            assert!(first_called.get().is_none());
1164            tokio::time::sleep(Duration::from_millis(100)).await;
1165        }
1166
1167        assert!(first_called.get().is_none());
1168    }
1169
1170    #[tokio::test]
1171    async fn check_priorities() -> super::Result<()> {
1172        let pool = pool_with_one_connection();
1173
1174        let queue_len = || {
1175            let exchange = pool.inner.exchange.lock().unwrap();
1176            exchange.waiting.queue.len()
1177        };
1178
1179        // Get a connection, so we know the next futures will be
1180        // queued.
1181        let conn = pool.get_conn().await.unwrap();
1182
1183        #[allow(clippy::async_yields_async)]
1184        let get_pending = || async {
1185            let fut = async {
1186                pool.get_conn().await.unwrap();
1187            }
1188            .shared();
1189            let p = poll!(fut.clone());
1190            assert!(matches!(p, Poll::Pending));
1191            fut
1192        };
1193
1194        let fut1 = get_pending().await;
1195        let fut2 = get_pending().await;
1196
1197        // Both futures are queued
1198        assert_eq!(queue_len(), 2);
1199
1200        drop(conn); // This will pop fut1 from the queue, making it [2]
1201        while queue_len() != 1 {
1202            tokio::time::sleep(Duration::from_millis(100)).await;
1203        }
1204
1205        // We called wake on fut1, and even with the select fut1 will
1206        // resolve first
1207        let Either::Right((_, fut2)) = select(fut2, fut1).await else {
1208            panic!("wrong future");
1209        };
1210
1211        // We dropped the connection of fut1, but very likely hasn't
1212        // made it through the recycler yet.
1213        assert_eq!(queue_len(), 1);
1214
1215        let p = poll!(fut2.clone());
1216        assert!(matches!(p, Poll::Pending));
1217        assert_eq!(queue_len(), 1); // The queue still has fut2
1218
1219        // The connection will pass by the recycler and unblock fut2
1220        // and pop it from the queue.
1221        fut2.await;
1222        assert_eq!(queue_len(), 0);
1223
1224        // The recycler is probably not done, so a new future will be
1225        // pending.
1226        let fut3 = get_pending().await;
1227        assert_eq!(queue_len(), 1);
1228
1229        // It is OK to await it.
1230        fut3.await;
1231
1232        Ok(())
1233    }
1234
1235    #[cfg(feature = "nightly")]
1236    mod bench {
1237        use futures_util::future::{FutureExt, TryFutureExt};
1238        use tokio::runtime::Runtime;
1239
1240        use crate::{prelude::Queryable, test_misc::get_opts, Pool, PoolConstraints, PoolOpts};
1241        use std::time::Duration;
1242
1243        #[bench]
1244        fn get_conn(bencher: &mut test::Bencher) {
1245            let mut runtime = Runtime::new().unwrap();
1246            let pool = Pool::new(get_opts());
1247
1248            bencher.iter(|| {
1249                let fut = pool
1250                    .get_conn()
1251                    .and_then(|mut conn| async { conn.ping().await.map(|_| conn) });
1252                runtime.block_on(fut).unwrap();
1253            });
1254
1255            runtime.block_on(pool.disconnect()).unwrap();
1256        }
1257
1258        #[bench]
1259        fn new_conn_on_pool_soft_boundary(bencher: &mut test::Bencher) {
1260            let mut runtime = Runtime::new().unwrap();
1261
1262            let pool_constraints = PoolConstraints::new(0, 1).unwrap();
1263            let pool_opts = PoolOpts::default()
1264                .with_constraints(pool_constraints)
1265                .with_inactive_connection_ttl(Duration::from_secs(1));
1266
1267            let pool = Pool::new(get_opts().pool_opts(pool_opts));
1268
1269            bencher.iter(|| {
1270                let fut = pool.get_conn().map(drop);
1271                runtime.block_on(fut);
1272            });
1273
1274            runtime.block_on(pool.disconnect()).unwrap();
1275        }
1276    }
1277}