mysql_async/conn/pool/
mod.rs

1// Copyright (c) 2016 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_util::FutureExt;
10use keyed_priority_queue::KeyedPriorityQueue;
11use tokio::sync::mpsc;
12
13use std::{
14    borrow::Borrow,
15    cmp::Reverse,
16    collections::VecDeque,
17    hash::{Hash, Hasher},
18    str::FromStr,
19    sync::{atomic, Arc, Mutex},
20    task::{Context, Poll, Waker},
21    time::{Duration, Instant},
22};
23
24use crate::{
25    conn::{pool::futures::*, Conn},
26    error::*,
27    opts::{Opts, PoolOpts},
28    queryable::transaction::{Transaction, TxOpts},
29};
30
31pub use metrics::Metrics;
32
33mod recycler;
34// this is a really unfortunate name for a module
35pub mod futures;
36mod metrics;
37mod ttl_check_inerval;
38
39/// Connection that is idling in the pool.
40#[derive(Debug)]
41struct IdlingConn {
42    /// The connection is idling since this `Instant`.
43    since: Instant,
44    /// Idling connection.
45    conn: Conn,
46}
47
48impl IdlingConn {
49    /// Returns true when this connection has a TTL and it elapsed.
50    fn expired(&self) -> bool {
51        self.conn
52            .inner
53            .ttl_deadline
54            .map(|t| Instant::now() > t)
55            .unwrap_or_default()
56    }
57
58    /// Returns duration elapsed since this connection is idling.
59    fn elapsed(&self) -> Duration {
60        self.since.elapsed()
61    }
62}
63
64impl From<Conn> for IdlingConn {
65    fn from(conn: Conn) -> Self {
66        Self {
67            since: Instant::now(),
68            conn,
69        }
70    }
71}
72
73/// The exchange is where we track all connections as they come and go.
74///
75/// It is held under a single, non-asynchronous lock.
76/// This is fine as long as we never do expensive work while holding the lock!
77#[derive(Debug)]
78struct Exchange {
79    waiting: Waitlist,
80    available: VecDeque<IdlingConn>,
81    exist: usize,
82    // only used to spawn the recycler the first time we're in async context
83    recycler: Option<(mpsc::UnboundedReceiver<Option<Conn>>, PoolOpts)>,
84}
85
86impl Exchange {
87    /// This function will spawn the recycler for this pool
88    /// as well as the ttl check interval if `inactive_connection_ttl` isn't `0`.
89    fn spawn_futures_if_needed(&mut self, inner: &Arc<Inner>) {
90        use recycler::Recycler;
91        use ttl_check_inerval::TtlCheckInterval;
92        if let Some((dropped, pool_opts)) = self.recycler.take() {
93            // Spawn the Recycler.
94            tokio::spawn(Recycler::new(pool_opts.clone(), inner.clone(), dropped));
95
96            // Spawn the ttl check interval if `inactive_connection_ttl` isn't `0` or
97            // connections have an absolute TTL.
98            if pool_opts.inactive_connection_ttl() > Duration::ZERO
99                || pool_opts.abs_conn_ttl().is_some()
100            {
101                tokio::spawn(TtlCheckInterval::new(pool_opts, inner.clone()));
102            }
103        }
104    }
105}
106
107#[derive(Default, Debug)]
108struct Waitlist {
109    queue: KeyedPriorityQueue<QueuedWaker, QueueId>,
110}
111
112impl Waitlist {
113    /// Returns `true` if pushed.
114    fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
115        // The documentation of Future::poll says:
116        //   Note that on multiple calls to poll, only the Waker from
117        //   the Context passed to the most recent call should be
118        //   scheduled to receive a wakeup.
119        //
120        // But the the documentation of KeyedPriorityQueue::push says:
121        //   Adds new element to queue if missing key or replace its
122        //   priority if key exists. In second case doesn’t replace key.
123        //
124        // This means we have to remove first to have the most recent
125        // waker in the queue.
126        let occupied = self.remove(queue_id);
127        self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
128        !occupied
129    }
130
131    fn pop(&mut self) -> Option<Waker> {
132        match self.queue.pop() {
133            Some((qw, _)) => Some(qw.waker),
134            None => None,
135        }
136    }
137
138    /// Returns `true` if removed.
139    fn remove(&mut self, id: QueueId) -> bool {
140        self.queue.remove(&id).is_some()
141    }
142
143    fn peek_id(&mut self) -> Option<QueueId> {
144        self.queue.peek().map(|(qw, _)| qw.queue_id)
145    }
146}
147
148const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
149
150#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
151pub(crate) struct QueueId(Reverse<u64>);
152
153impl QueueId {
154    fn next() -> Self {
155        static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
156        let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
157        QueueId(Reverse(id))
158    }
159}
160
161#[derive(Debug)]
162struct QueuedWaker {
163    queue_id: QueueId,
164    waker: Waker,
165}
166
167impl Eq for QueuedWaker {}
168
169impl Borrow<QueueId> for QueuedWaker {
170    fn borrow(&self) -> &QueueId {
171        &self.queue_id
172    }
173}
174
175impl PartialEq for QueuedWaker {
176    fn eq(&self, other: &Self) -> bool {
177        self.queue_id == other.queue_id
178    }
179}
180
181impl Hash for QueuedWaker {
182    fn hash<H: Hasher>(&self, state: &mut H) {
183        self.queue_id.hash(state)
184    }
185}
186
187/// Connection pool data.
188#[derive(Debug)]
189pub struct Inner {
190    metrics: Arc<Metrics>,
191    close: atomic::AtomicBool,
192    closed: atomic::AtomicBool,
193    exchange: Mutex<Exchange>,
194}
195
196/// Asynchronous pool of MySql connections.
197///
198/// Actually `Pool` is a shared reference, i.e. every clone will lead to the same instance
199/// created with [`Pool::new`]. Also `Pool` satisfies `Send` and `Sync`, so you don't have to wrap
200/// it into an `Arc` or `Mutex`.
201///
202/// Note that you will probably want to await [`Pool::disconnect`] before dropping the runtime, as
203/// otherwise you may end up with a number of connections that are not cleanly terminated.
204#[derive(Debug, Clone)]
205pub struct Pool {
206    opts: Opts,
207    inner: Arc<Inner>,
208    drop: mpsc::UnboundedSender<Option<Conn>>,
209}
210
211impl Pool {
212    /// Creates a new pool of connections.
213    ///
214    /// # Panic
215    ///
216    /// It'll panic if `Opts::try_from(opts)` returns error.
217    pub fn new<O>(opts: O) -> Pool
218    where
219        Opts: TryFrom<O>,
220        <Opts as TryFrom<O>>::Error: std::error::Error,
221    {
222        let opts = Opts::try_from(opts).unwrap();
223        let pool_opts = opts.pool_opts().clone();
224        let (tx, rx) = mpsc::unbounded_channel();
225        Pool {
226            opts,
227            inner: Arc::new(Inner {
228                close: false.into(),
229                closed: false.into(),
230                metrics: Arc::new(Metrics::default()),
231                exchange: Mutex::new(Exchange {
232                    available: VecDeque::with_capacity(pool_opts.constraints().max()),
233                    waiting: Waitlist::default(),
234                    exist: 0,
235                    recycler: Some((rx, pool_opts)),
236                }),
237            }),
238            drop: tx,
239        }
240    }
241
242    /// Returns metrics for the connection pool.
243    pub fn metrics(&self) -> Arc<Metrics> {
244        self.inner.metrics.clone()
245    }
246
247    /// Creates a new pool of connections.
248    pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
249        let opts = Opts::from_str(url.as_ref())?;
250        Ok(Pool::new(opts))
251    }
252
253    /// Async function that resolves to `Conn`.
254    pub fn get_conn(&self) -> GetConn {
255        let reset_connection = self.opts.pool_opts().reset_connection();
256        GetConn::new(self, reset_connection)
257    }
258
259    /// Starts a new transaction.
260    pub async fn start_transaction(&self, options: TxOpts) -> Result<Transaction<'static>> {
261        let conn = self.get_conn().await?;
262        Transaction::new(conn, options).await
263    }
264
265    /// Async function that disconnects this pool from the server and resolves to `()`.
266    ///
267    /// **Note:** This Future won't resolve until all active connections, taken from it,
268    /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
269    pub fn disconnect(self) -> DisconnectPool {
270        DisconnectPool::new(self)
271    }
272
273    /// A way to return connection taken from a pool.
274    fn return_conn(&mut self, conn: Conn) {
275        // NOTE: we're not in async context here, so we can't block or return NotReady
276        // any and all cleanup work _has_ to be done in the spawned recycler
277        self.send_to_recycler(conn);
278    }
279
280    fn send_to_recycler(&self, conn: Conn) {
281        if let Err(conn) = self.drop.send(Some(conn)) {
282            let conn = conn.0.unwrap();
283
284            // This _probably_ means that the Runtime is shutting down, and that the Recycler was
285            // dropped rather than allowed to exit cleanly.
286            if !self.inner.closed.load(atomic::Ordering::SeqCst) {
287                // Yup, Recycler was forcibly dropped!
288                // All we can do here is try the non-pool drop path for Conn.
289                assert!(conn.inner.pool.is_none());
290                drop(conn);
291            } else {
292                unreachable!("Recycler exited while connections still exist");
293            }
294        }
295    }
296
297    /// Indicate that a connection failed to be created and release it.
298    ///
299    /// Decreases the exist counter since a broken or dropped connection should not count towards
300    /// the total.
301    pub(super) fn cancel_connection(&self) {
302        let mut exchange = self.inner.exchange.lock().unwrap();
303        exchange.exist -= 1;
304        self.inner
305            .metrics
306            .create_failed
307            .fetch_add(1, atomic::Ordering::Relaxed);
308        self.inner
309            .metrics
310            .connection_count
311            .store(exchange.exist, atomic::Ordering::Relaxed);
312        // we just enabled the creation of a new connection!
313        if let Some(w) = exchange.waiting.pop() {
314            w.wake();
315        }
316    }
317
318    /// Poll the pool for an available connection.
319    fn poll_new_conn(
320        &mut self,
321        cx: &mut Context<'_>,
322        queue_id: QueueId,
323    ) -> Poll<Result<GetConnInner>> {
324        let mut exchange = self.inner.exchange.lock().unwrap();
325
326        // NOTE: this load must happen while we hold the lock,
327        // otherwise the recycler may choose to exit, see that .exist == 0, and then exit,
328        // and then we decide to create a new connection, which would then never be torn down.
329        if self.inner.close.load(atomic::Ordering::Acquire) {
330            return Err(Error::Driver(DriverError::PoolDisconnected)).into();
331        }
332
333        exchange.spawn_futures_if_needed(&self.inner);
334
335        // Check if we are higher priority than anything current
336        let highest = if let Some(cur) = exchange.waiting.peek_id() {
337            queue_id > cur
338        } else {
339            true
340        };
341
342        // If we are not, just queue
343        if !highest {
344            if exchange.waiting.push(cx.waker().clone(), queue_id) {
345                self.inner
346                    .metrics
347                    .active_wait_requests
348                    .fetch_add(1, atomic::Ordering::Relaxed);
349            }
350            return Poll::Pending;
351        }
352
353        #[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled
354        while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() {
355            if !conn.expired() {
356                #[cfg(feature = "hdrhistogram")]
357                self.inner
358                    .metrics
359                    .connection_idle_duration
360                    .lock()
361                    .unwrap()
362                    .saturating_record(since.elapsed().as_micros() as u64);
363                #[cfg(feature = "hdrhistogram")]
364                let metrics = self.metrics();
365                conn.inner.active_since = Instant::now();
366                return Poll::Ready(Ok(GetConnInner::Checking(
367                    async move {
368                        conn.stream_mut()?.check().await?;
369                        #[cfg(feature = "hdrhistogram")]
370                        metrics
371                            .check_duration
372                            .lock()
373                            .unwrap()
374                            .saturating_record(
375                                conn.inner.active_since.elapsed().as_micros() as u64
376                            );
377                        Ok(conn)
378                    }
379                    .boxed(),
380                )));
381            } else {
382                self.send_to_recycler(conn);
383            }
384        }
385
386        self.inner
387            .metrics
388            .connections_in_pool
389            .store(exchange.available.len(), atomic::Ordering::Relaxed);
390
391        // we didn't _immediately_ get one -- try to make one
392        // we first try to just do a load so we don't do an unnecessary add then sub
393        if exchange.exist < self.opts.pool_opts().constraints().max() {
394            // we are allowed to make a new connection, so we will!
395            exchange.exist += 1;
396
397            self.inner
398                .metrics
399                .connection_count
400                .store(exchange.exist, atomic::Ordering::Relaxed);
401
402            let opts = self.opts.clone();
403            #[cfg(feature = "hdrhistogram")]
404            let metrics = self.metrics();
405
406            return Poll::Ready(Ok(GetConnInner::Connecting(
407                async move {
408                    let conn = Conn::new(opts).await;
409                    #[cfg(feature = "hdrhistogram")]
410                    if let Ok(conn) = &conn {
411                        metrics
412                            .connect_duration
413                            .lock()
414                            .unwrap()
415                            .saturating_record(
416                                conn.inner.active_since.elapsed().as_micros() as u64
417                            );
418                    }
419                    conn
420                }
421                .boxed(),
422            )));
423        }
424
425        // Polled, but no conn available? Back into the queue.
426        if exchange.waiting.push(cx.waker().clone(), queue_id) {
427            self.inner
428                .metrics
429                .active_wait_requests
430                .fetch_add(1, atomic::Ordering::Relaxed);
431        }
432        Poll::Pending
433    }
434
435    fn unqueue(&self, queue_id: QueueId) {
436        let mut exchange = self.inner.exchange.lock().unwrap();
437        if exchange.waiting.remove(queue_id) {
438            self.inner
439                .metrics
440                .active_wait_requests
441                .fetch_sub(1, atomic::Ordering::Relaxed);
442        }
443    }
444}
445
446impl Drop for Conn {
447    fn drop(&mut self) {
448        self.inner.infile_handler = None;
449
450        if std::thread::panicking() {
451            // Try to decrease the number of existing connections.
452            if let Some(pool) = self.inner.pool.take() {
453                pool.cancel_connection();
454            }
455
456            return;
457        }
458
459        if let Some(mut pool) = self.inner.pool.take() {
460            pool.return_conn(self.take());
461        } else if self.inner.stream.is_some() && !self.inner.disconnected {
462            crate::conn::disconnect(self.take());
463        }
464    }
465}
466
467#[cfg(test)]
468mod test {
469    use futures_util::{
470        future::{join_all, select, select_all, try_join_all, Either},
471        poll, try_join, FutureExt,
472    };
473    use tokio::time::{sleep, timeout};
474    use waker_fn::waker_fn;
475
476    use std::{
477        cmp::Reverse,
478        future::Future,
479        pin::pin,
480        sync::{Arc, OnceLock},
481        task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
482        time::Duration,
483    };
484
485    use crate::{
486        conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID},
487        opts::PoolOpts,
488        prelude::*,
489        test_misc::get_opts,
490        PoolConstraints, Row, TxOpts, Value,
491    };
492
493    macro_rules! conn_ex_field {
494        ($conn:expr, $field:tt) => {
495            ex_field!($conn.inner.pool.as_ref().unwrap(), $field)
496        };
497    }
498
499    macro_rules! ex_field {
500        ($pool:expr, $field:tt) => {
501            $pool.inner.exchange.lock().unwrap().$field
502        };
503    }
504
505    fn pool_with_one_connection() -> Pool {
506        let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
507        let opts = get_opts().pool_opts(pool_opts.clone());
508        Pool::new(opts)
509    }
510
511    #[tokio::test]
512    async fn should_opt_out_of_connection_reset() -> super::Result<()> {
513        let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new(1, 1).unwrap());
514        let opts = get_opts().pool_opts(pool_opts.clone());
515
516        let pool = Pool::new(opts.clone());
517
518        let mut conn = pool.get_conn().await.unwrap();
519        assert_eq!(
520            conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
521            Value::NULL
522        );
523        conn.query_drop("SET @foo = 'foo'").await?;
524        assert_eq!(
525            conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
526            "foo",
527        );
528        drop(conn);
529
530        conn = pool.get_conn().await.unwrap();
531        assert_eq!(
532            conn.query_first::<Value, _>("SELECT @foo").await?.unwrap(),
533            Value::NULL
534        );
535        conn.query_drop("SET @foo = 'foo'").await?;
536        conn.reset_connection(false);
537        drop(conn);
538
539        conn = pool.get_conn().await.unwrap();
540        assert_eq!(
541            conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
542            "foo",
543        );
544        drop(conn);
545        pool.disconnect().await.unwrap();
546
547        let pool = Pool::new(opts.pool_opts(pool_opts.with_reset_connection(false)));
548        conn = pool.get_conn().await.unwrap();
549        conn.query_drop("SET @foo = 'foo'").await?;
550        drop(conn);
551        conn = pool.get_conn().await.unwrap();
552        assert_eq!(
553            conn.query_first::<String, _>("SELECT @foo").await?.unwrap(),
554            "foo",
555        );
556        drop(conn);
557        pool.disconnect().await
558    }
559
560    #[test]
561    fn should_not_hang() -> super::Result<()> {
562        pub struct Database {
563            pool: Pool,
564        }
565
566        impl Database {
567            pub async fn disconnect(self) -> super::Result<()> {
568                self.pool.disconnect().await?;
569                Ok(())
570            }
571        }
572
573        let runtime = tokio::runtime::Runtime::new().unwrap();
574        let database = Database {
575            pool: Pool::new(get_opts()),
576        };
577        runtime.block_on(database.disconnect())
578    }
579
580    #[tokio::test]
581    async fn should_track_conn_if_disconnected_outside_of_a_pool() -> super::Result<()> {
582        let pool = Pool::new(get_opts());
583        let conn = pool.get_conn().await?;
584        conn.disconnect().await?;
585        pool.disconnect().await?;
586        Ok(())
587    }
588
589    #[tokio::test]
590    async fn should_connect() -> super::Result<()> {
591        let pool = Pool::new(crate::Opts::from(get_opts()));
592        pool.get_conn().await?.ping().await?;
593        pool.disconnect().await?;
594        Ok(())
595    }
596
597    #[tokio::test]
598    async fn should_reconnect() -> super::Result<()> {
599        let mut master = crate::Conn::new(get_opts()).await?;
600
601        async fn test(master: &mut crate::Conn, opts: crate::OptsBuilder) -> super::Result<()> {
602            const NUM_CONNS: usize = 5;
603            let pool = Pool::new(opts);
604
605            // create some conns..
606            let connections = (0..NUM_CONNS).map(|_| {
607                async {
608                    let mut conn = pool.get_conn().await?;
609                    conn.ping().await?;
610                    crate::Result::Ok(conn)
611                }
612                .boxed()
613            });
614
615            // collect ids..
616            let ids = try_join_all(connections)
617                .await?
618                .into_iter()
619                .map(|conn| conn.id())
620                .collect::<Vec<_>>();
621
622            // give some time to reset connections
623            sleep(Duration::from_millis(1000)).await;
624
625            // get_conn should work if connection is available and alive
626            pool.get_conn().await?;
627
628            // now we'll kill connections..
629            for id in ids {
630                master.query_drop(format!("KILL {}", id)).await?;
631            }
632
633            // now check, that they're still in the pool..
634            assert_eq!(ex_field!(pool, available).len(), NUM_CONNS);
635
636            sleep(Duration::from_millis(500)).await;
637
638            // now get new connection..
639            let _conn = pool.get_conn().await?;
640
641            // now check, that broken connections are dropped
642            assert_eq!(ex_field!(pool, available).len(), 0);
643
644            drop(_conn);
645            pool.disconnect().await
646        }
647
648        println!("Check socket/pipe..");
649        test(&mut master, get_opts()).await?;
650
651        println!("Check tcp..");
652        test(&mut master, get_opts().prefer_socket(false)).await?;
653
654        master.disconnect().await
655    }
656
657    #[tokio::test]
658    async fn should_reuse_connections() -> super::Result<()> {
659        let pool = pool_with_one_connection();
660        let mut conn = pool.get_conn().await?;
661
662        let server_version = conn.server_version();
663        let connection_id = conn.id();
664
665        for _ in 0..16 {
666            drop(conn);
667            conn = pool.get_conn().await?;
668            println!("CONN connection_id={}", conn.id());
669            assert!(conn.id() == connection_id || server_version < (5, 7, 2));
670        }
671
672        Ok(())
673    }
674
675    #[tokio::test]
676    #[ignore]
677    async fn can_handle_the_pressure() {
678        let pool = Pool::new(get_opts());
679        for _ in 0..10i32 {
680            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
681            for i in 0..10_000 {
682                let pool = pool.clone();
683                let tx = tx.clone();
684                tokio::spawn(async move {
685                    let _ = pool.get_conn().await.unwrap();
686                    tx.send(i).unwrap();
687                });
688            }
689            drop(tx);
690            // see that all the tx's eventually complete
691            while (rx.recv().await).is_some() {}
692        }
693        drop(pool);
694    }
695
696    #[tokio::test]
697    async fn should_start_transaction() -> super::Result<()> {
698        let pool = pool_with_one_connection();
699
700        "CREATE TABLE IF NOT EXISTS mysql.tmp(id int)"
701            .ignore(&pool)
702            .await?;
703        "DELETE FROM mysql.tmp".ignore(&pool).await?;
704
705        let mut tx = pool.start_transaction(TxOpts::default()).await?;
706        tx.exec_batch(
707            "INSERT INTO mysql.tmp (id) VALUES (?)",
708            vec![(1_u8,), (2_u8,)],
709        )
710        .await?;
711        tx.exec_drop("SELECT * FROM mysql.tmp", ()).await?;
712        drop(tx);
713        let row_opt = pool
714            .get_conn()
715            .await?
716            .query_first("SELECT COUNT(*) FROM mysql.tmp")
717            .await?;
718        assert_eq!(row_opt, Some((0u8,)));
719        pool.get_conn()
720            .await?
721            .query_drop("DROP TABLE mysql.tmp")
722            .await?;
723        pool.disconnect().await?;
724        Ok(())
725    }
726
727    #[tokio::test]
728    async fn should_check_inactive_connection_ttl() -> super::Result<()> {
729        const POOL_MIN: usize = 5;
730        const POOL_MAX: usize = 10;
731
732        const INACTIVE_CONNECTION_TTL: Duration = Duration::from_millis(500);
733        const TTL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
734
735        let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
736        let pool_opts = PoolOpts::default()
737            .with_constraints(constraints)
738            .with_inactive_connection_ttl(INACTIVE_CONNECTION_TTL)
739            .with_ttl_check_interval(TTL_CHECK_INTERVAL);
740
741        let pool = Pool::new(get_opts().pool_opts(pool_opts));
742        let pool_clone = pool.clone();
743        let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
744
745        let conns = try_join_all(conns).await?;
746
747        assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
748        drop(conns);
749
750        // wait for a bit to let the connections be reclaimed
751        sleep(Duration::from_millis(100)).await;
752
753        // check that connections are still in the pool because of inactive_connection_ttl
754        assert_eq!(ex_field!(pool_clone, available).len(), POOL_MAX);
755
756        // then, wait for ttl_check_interval
757        sleep(TTL_CHECK_INTERVAL).await;
758
759        // wait a bit more to let the connections be reclaimed by the ttl check
760        sleep(Duration::from_millis(500)).await;
761
762        // check that we have the expected number of connections
763        assert_eq!(ex_field!(pool_clone, available).len(), POOL_MIN);
764
765        Ok(())
766    }
767
768    #[tokio::test]
769    async fn aa_should_hold_bounds2() -> super::Result<()> {
770        use std::cmp::min;
771
772        const POOL_MIN: usize = 5;
773        const POOL_MAX: usize = 10;
774
775        let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
776        let pool_opts = PoolOpts::default().with_constraints(constraints);
777
778        let pool = Pool::new(get_opts().pool_opts(pool_opts));
779        let pool_clone = pool.clone();
780        let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
781
782        let mut conns = try_join_all(conns).await?;
783
784        // we want to continuously drop connections
785        // and check that they are _actually_ dropped until we reach POOL_MIN
786        assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
787
788        while !conns.is_empty() {
789            // first, drop a connection
790            let _ = conns.pop();
791
792            // then, wait for a bit to let the connection be reclaimed
793            sleep(Duration::from_millis(500)).await;
794
795            // now check that we have the expected # of connections
796            // this may look a little funky, but think of it this way:
797            //
798            //  - if we hold all 10 connections, we expect 10
799            //  - if we drop one,  we still expect 10, because POOL_MIN limits
800            //    the number of _idle_ connections (of which there is only 1)
801            //  - once we've dropped 5, there are now 5 idle connections. thus,
802            //    if we drop one more, we _now_ expect there to be only 9
803            //    connections total (no more connections should be pushed to
804            //    idle).
805            let dropped = POOL_MAX - conns.len();
806            let idle = min(dropped, POOL_MIN);
807            let expected = conns.len() + idle;
808
809            // check that we have the expected number of connections
810            let have = ex_field!(pool_clone, exist);
811            assert_eq!(have, expected);
812        }
813
814        pool.disconnect().await?;
815        Ok(())
816    }
817
818    #[tokio::test]
819    async fn should_hold_bounds1() -> super::Result<()> {
820        let constraints = PoolConstraints::new(1, 2).unwrap();
821        let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints));
822        let pool = Pool::new(opts);
823        let pool_clone = pool.clone();
824
825        let (conn1, conn2) = try_join!(pool.get_conn(), pool.get_conn()).unwrap();
826
827        assert_eq!(conn_ex_field!(conn1, exist), 2);
828        assert_eq!(conn_ex_field!(conn1, available).len(), 0);
829
830        drop(conn1);
831        drop(conn2);
832        // only one of conn1 and conn2 should have gone to idle,
833        // and should have immediately been picked up by new_conn (now conn1)
834        let conn1 = pool_clone.get_conn().await?;
835        assert_eq!(conn_ex_field!(conn1, available).len(), 0);
836
837        drop(conn1);
838
839        // the connection should be returned to idle
840        // (but may not have been returned _yet_)
841        assert!(ex_field!(pool, available).len() <= 1);
842        pool.disconnect().await?;
843        Ok(())
844    }
845
846    // Test that connections which err do not count towards the connection count in the pool.
847    #[tokio::test]
848    async fn should_hold_bounds_on_error() -> super::Result<()> {
849        // Should not be possible to connect to broadcast address.
850        let pool = Pool::new("mysql://255.255.255.255");
851
852        assert!(try_join!(pool.get_conn(), pool.get_conn()).is_err());
853        assert_eq!(ex_field!(pool, exist), 0);
854        Ok(())
855    }
856
857    #[tokio::test]
858    async fn zz_should_check_wait_timeout_on_get_conn() -> super::Result<()> {
859        let pool = Pool::new(get_opts());
860
861        let mut conn = pool.get_conn().await?;
862        let wait_timeout_orig: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
863        conn.query_drop("SET GLOBAL wait_timeout = 3").await?;
864        conn.disconnect().await?;
865
866        let mut conn = pool.get_conn().await?;
867        let wait_timeout: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
868        let id1: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
869        drop(conn);
870
871        assert_eq!(wait_timeout, Some(3));
872        assert_eq!(ex_field!(pool, exist), 1);
873
874        sleep(Duration::from_secs(6)).await;
875
876        let mut conn = pool.get_conn().await?;
877        let id2: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
878        assert_eq!(ex_field!(pool, exist), 1);
879        assert_ne!(id1, id2);
880
881        conn.exec_drop("SET GLOBAL wait_timeout = ?", (wait_timeout_orig,))
882            .await?;
883        drop(conn);
884
885        pool.disconnect().await?;
886
887        Ok(())
888    }
889
890    #[tokio::test]
891    async fn droptest() -> super::Result<()> {
892        let pool = Pool::new(get_opts());
893        let conns = try_join_all((0..10).map(|_| pool.get_conn()))
894            .await
895            .unwrap();
896        drop(conns);
897        drop(pool);
898
899        let pool = Pool::new(get_opts());
900        let conns = try_join_all((0..10).map(|_| pool.get_conn()))
901            .await
902            .unwrap();
903        drop(pool);
904        drop(conns);
905        Ok(())
906    }
907
908    #[test]
909    fn drop_impl_for_conn_should_not_panic_within_unwind() {
910        use tokio::runtime;
911
912        const PANIC_MESSAGE: &str = "ORIGINAL_PANIC";
913
914        let result = std::panic::catch_unwind(|| {
915            runtime::Builder::new_current_thread()
916                .enable_all()
917                .build()
918                .unwrap()
919                .block_on(async {
920                    let pool = Pool::new(get_opts());
921                    let _conn = pool.get_conn().await.unwrap();
922                    std::panic::panic_any(PANIC_MESSAGE);
923                });
924        });
925
926        assert_eq!(
927            *result.unwrap_err().downcast::<&str>().unwrap(),
928            "ORIGINAL_PANIC",
929        );
930    }
931
932    #[test]
933    fn should_not_panic_on_unclean_shutdown() {
934        // run more than once to trigger different drop orders
935        for _ in 0..10 {
936            let rt = tokio::runtime::Runtime::new().unwrap();
937            let (tx, rx) = tokio::sync::oneshot::channel();
938            rt.block_on(async move {
939                let pool = Pool::new(get_opts());
940                let mut c = pool.get_conn().await.unwrap();
941                tokio::spawn(async move {
942                    let _ = rx.await;
943                    let _ = c.query_drop("SELECT 1").await;
944                });
945            });
946            drop(rt);
947            // c is still active here, so if anything it's been forcibly dropped
948            let _ = tx.send(());
949        }
950    }
951
952    #[test]
953    fn should_perform_clean_shutdown() {
954        // run more than once to trigger different drop orders
955        for _ in 0..10 {
956            let rt = tokio::runtime::Runtime::new().unwrap();
957            let (tx, rx) = tokio::sync::oneshot::channel();
958            let jh = rt.spawn(async move {
959                let pool = Pool::new(get_opts());
960                let mut c = pool.get_conn().await.unwrap();
961                tokio::spawn(async move {
962                    let _ = rx.await;
963                    let _ = c.query_drop("SELECT 1").await;
964                });
965                let _ = pool.disconnect().await;
966            });
967            let _ = tx.send(());
968            rt.block_on(jh).unwrap();
969        }
970    }
971
972    #[tokio::test]
973    async fn issue_126_should_cleanup_errors_in_multiresult_sets() -> super::Result<()> {
974        let pool_constraints = PoolConstraints::new(0, 1).unwrap();
975        let pool_opts = PoolOpts::default().with_constraints(pool_constraints);
976
977        let pool = Pool::new(get_opts().pool_opts(pool_opts));
978
979        for _ in 0u8..100 {
980            pool.get_conn()
981                .await?
982                .query_iter("DO '42'; BLABLA;")
983                .await?;
984        }
985
986        Ok(())
987    }
988
989    #[tokio::test]
990    async fn should_ignore_non_fatal_errors_while_returning_to_a_pool() -> super::Result<()> {
991        let pool = pool_with_one_connection();
992        let id = pool.get_conn().await?.id();
993
994        // non-fatal errors are ignored
995        for _ in 0u8..10 {
996            let mut conn = pool.get_conn().await?;
997            conn.query_iter("DO '42'; BLABLA;").await?;
998            assert_eq!(id, conn.id());
999        }
1000
1001        Ok(())
1002    }
1003
1004    #[tokio::test]
1005    async fn should_remove_waker_of_cancelled_task() {
1006        let pool = pool_with_one_connection();
1007        let only_conn = pool.get_conn().await.unwrap();
1008
1009        let join_handle = tokio::spawn(timeout(Duration::from_secs(1), pool.get_conn()));
1010
1011        sleep(Duration::from_secs(2)).await;
1012
1013        match join_handle.await.unwrap() {
1014            Err(_elapsed) => (),
1015            _ => panic!("unexpected Ok()"),
1016        }
1017        drop(only_conn);
1018
1019        assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len());
1020        // metrics should catch up with waiting queue (see #335)
1021        assert_eq!(
1022            0,
1023            pool.metrics()
1024                .active_wait_requests
1025                .load(std::sync::atomic::Ordering::Relaxed)
1026        );
1027    }
1028
1029    #[tokio::test]
1030    async fn should_work_if_pooled_connection_operation_is_cancelled() -> super::Result<()> {
1031        let pool = Pool::new(get_opts());
1032
1033        // warm up
1034        join_all((0..10).map(|_| pool.get_conn())).await;
1035
1036        /// some operation
1037        async fn op(pool: &Pool) {
1038            let _: Option<Row> = pool
1039                .get_conn()
1040                .await
1041                .unwrap()
1042                .exec_first("SELECT ?, ?", (42, "foo"))
1043                .await
1044                .unwrap();
1045        }
1046
1047        // Measure the delay
1048        let mut max_delay = 0_u128;
1049        for _ in 0..10_usize {
1050            let start = std::time::Instant::now();
1051            op(&pool).await;
1052            max_delay = std::cmp::max(max_delay, start.elapsed().as_micros());
1053        }
1054
1055        for _ in 0_usize..128 {
1056            let fut = select_all((0_usize..5).map(|_| op(&pool).boxed()));
1057
1058            // we need to cancel the op in the middle
1059            // this should not lead to the `packet out of order` error.
1060            let delay_micros = rand::random::<u128>() % max_delay;
1061            select(
1062                sleep(Duration::from_micros(delay_micros as u64)).boxed(),
1063                fut,
1064            )
1065            .await;
1066
1067            // give some time for connections to return to the pool
1068            sleep(Duration::from_millis(100)).await;
1069        }
1070        Ok(())
1071    }
1072
1073    #[test]
1074    fn waitlist_integrity() {
1075        const DATA: *const () = &();
1076        const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
1077        const NOOP_FN: unsafe fn(*const ()) = |_| {};
1078        static RW_VTABLE: RawWakerVTable =
1079            RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
1080        let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
1081
1082        let mut waitlist = Waitlist::default();
1083        assert_eq!(0, waitlist.queue.len());
1084
1085        waitlist.push(w.clone(), QueueId(Reverse(4)));
1086        waitlist.push(w.clone(), QueueId(Reverse(2)));
1087        waitlist.push(w.clone(), QueueId(Reverse(8)));
1088        waitlist.push(w.clone(), QUEUE_END_ID);
1089        waitlist.push(w.clone(), QueueId(Reverse(10)));
1090
1091        waitlist.remove(QueueId(Reverse(8)));
1092
1093        assert_eq!(4, waitlist.queue.len());
1094
1095        let (_, id) = waitlist.queue.pop().unwrap();
1096        assert_eq!(2, id.0 .0);
1097        let (_, id) = waitlist.queue.pop().unwrap();
1098        assert_eq!(4, id.0 .0);
1099        let (_, id) = waitlist.queue.pop().unwrap();
1100        assert_eq!(10, id.0 .0);
1101        let (_, id) = waitlist.queue.pop().unwrap();
1102        assert_eq!(QUEUE_END_ID, id);
1103
1104        assert_eq!(0, waitlist.queue.len());
1105    }
1106
1107    #[tokio::test]
1108    async fn check_absolute_connection_ttl() -> super::Result<()> {
1109        let constraints = PoolConstraints::new(1, 3).unwrap();
1110        let pool_opts = PoolOpts::default()
1111            .with_constraints(constraints)
1112            .with_inactive_connection_ttl(Duration::from_secs(99))
1113            .with_ttl_check_interval(Duration::from_secs(1))
1114            .with_abs_conn_ttl(Some(Duration::from_secs(2)));
1115
1116        let pool = Pool::new(get_opts().pool_opts(pool_opts));
1117
1118        let conn_ttl0 = pool.get_conn().await?;
1119        sleep(Duration::from_millis(1000)).await;
1120        let conn_ttl1 = pool.get_conn().await?;
1121        sleep(Duration::from_millis(1000)).await;
1122        let conn_ttl2 = pool.get_conn().await?;
1123
1124        drop(conn_ttl0);
1125        drop(conn_ttl1);
1126        drop(conn_ttl2);
1127        assert_eq!(ex_field!(pool, exist), 3);
1128
1129        sleep(Duration::from_millis(1500)).await;
1130        assert_eq!(ex_field!(pool, exist), 2);
1131
1132        sleep(Duration::from_millis(1000)).await;
1133        assert_eq!(ex_field!(pool, exist), 1);
1134
1135        // Go even below min pool size.
1136        sleep(Duration::from_millis(1000)).await;
1137        assert_eq!(ex_field!(pool, exist), 0);
1138
1139        Ok(())
1140    }
1141
1142    #[tokio::test]
1143    async fn save_last_waker() {
1144        // Test that if passed multiple wakers, we call the last one.
1145
1146        let pool = pool_with_one_connection();
1147
1148        // Get a connection, so we know the next future will be
1149        // queued.
1150        let conn = pool.get_conn().await.unwrap();
1151        let mut pending_fut = pin!(pool.get_conn());
1152
1153        let build_waker = || {
1154            let called = Arc::new(OnceLock::new());
1155            let called2 = called.clone();
1156            let waker = waker_fn(move || called2.set(()).unwrap());
1157            (called, waker)
1158        };
1159
1160        let mut assert_pending = |waker| {
1161            let mut context = Context::from_waker(&waker);
1162            let p = pending_fut.as_mut().poll(&mut context);
1163            assert!(matches!(p, Poll::Pending));
1164        };
1165
1166        let (first_called, waker) = build_waker();
1167        assert_pending(waker);
1168
1169        let (second_called, waker) = build_waker();
1170        assert_pending(waker);
1171
1172        drop(conn);
1173
1174        while second_called.get().is_none() {
1175            assert!(first_called.get().is_none());
1176            tokio::time::sleep(Duration::from_millis(100)).await;
1177        }
1178
1179        assert!(first_called.get().is_none());
1180    }
1181
1182    #[tokio::test]
1183    async fn check_priorities() -> super::Result<()> {
1184        let pool = pool_with_one_connection();
1185
1186        let queue_len = || {
1187            let exchange = pool.inner.exchange.lock().unwrap();
1188            exchange.waiting.queue.len()
1189        };
1190
1191        // Get a connection, so we know the next futures will be
1192        // queued.
1193        let conn = pool.get_conn().await.unwrap();
1194
1195        #[allow(clippy::async_yields_async)]
1196        let get_pending = || async {
1197            let fut = async {
1198                pool.get_conn().await.unwrap();
1199            }
1200            .shared();
1201            let p = poll!(fut.clone());
1202            assert!(matches!(p, Poll::Pending));
1203            fut
1204        };
1205
1206        let fut1 = get_pending().await;
1207        let fut2 = get_pending().await;
1208
1209        // Both futures are queued
1210        assert_eq!(queue_len(), 2);
1211
1212        drop(conn); // This will pop fut1 from the queue, making it [2]
1213        while queue_len() != 1 {
1214            tokio::time::sleep(Duration::from_millis(100)).await;
1215        }
1216
1217        // We called wake on fut1, and even with the select fut1 will
1218        // resolve first
1219        let Either::Right((_, fut2)) = select(fut2, fut1).await else {
1220            panic!("wrong future");
1221        };
1222
1223        // We dropped the connection of fut1, but very likely hasn't
1224        // made it through the recycler yet.
1225        assert_eq!(queue_len(), 1);
1226
1227        let p = poll!(fut2.clone());
1228        assert!(matches!(p, Poll::Pending));
1229        assert_eq!(queue_len(), 1); // The queue still has fut2
1230
1231        // The connection will pass by the recycler and unblock fut2
1232        // and pop it from the queue.
1233        fut2.await;
1234        assert_eq!(queue_len(), 0);
1235
1236        // The recycler is probably not done, so a new future will be
1237        // pending.
1238        let fut3 = get_pending().await;
1239        assert_eq!(queue_len(), 1);
1240
1241        // It is OK to await it.
1242        fut3.await;
1243
1244        Ok(())
1245    }
1246
1247    #[cfg(feature = "nightly")]
1248    mod bench {
1249        use futures_util::future::{FutureExt, TryFutureExt};
1250        use tokio::runtime::Runtime;
1251
1252        use crate::{prelude::Queryable, test_misc::get_opts, Pool, PoolConstraints, PoolOpts};
1253        use std::time::Duration;
1254
1255        #[bench]
1256        fn get_conn(bencher: &mut test::Bencher) {
1257            let mut runtime = Runtime::new().unwrap();
1258            let pool = Pool::new(get_opts());
1259
1260            bencher.iter(|| {
1261                let fut = pool
1262                    .get_conn()
1263                    .and_then(|mut conn| async { conn.ping().await.map(|_| conn) });
1264                runtime.block_on(fut).unwrap();
1265            });
1266
1267            runtime.block_on(pool.disconnect()).unwrap();
1268        }
1269
1270        #[bench]
1271        fn new_conn_on_pool_soft_boundary(bencher: &mut test::Bencher) {
1272            let mut runtime = Runtime::new().unwrap();
1273
1274            let pool_constraints = PoolConstraints::new(0, 1).unwrap();
1275            let pool_opts = PoolOpts::default()
1276                .with_constraints(pool_constraints)
1277                .with_inactive_connection_ttl(Duration::from_secs(1));
1278
1279            let pool = Pool::new(get_opts().pool_opts(pool_opts));
1280
1281            bencher.iter(|| {
1282                let fut = pool.get_conn().map(drop);
1283                runtime.block_on(fut);
1284            });
1285
1286            runtime.block_on(pool.disconnect()).unwrap();
1287        }
1288    }
1289}