deadpool/managed/
mod.rs

1//! Managed version of the pool.
2//!
3//! "Managed" means that it requires a [`Manager`] which is responsible for
4//! creating and recycling objects as they are needed.
5//!
6//! # Example
7//!
8//! ```rust
9//! use async_trait::async_trait;
10//! use deadpool::managed;
11//!
12//! #[derive(Debug)]
13//! enum Error { Fail }
14//!
15//! struct Computer {}
16//!
17//! impl Computer {
18//!     async fn get_answer(&self) -> i32 {
19//!         42
20//!     }
21//! }
22//!
23//! struct Manager {}
24//!
25//! #[async_trait]
26//! impl managed::Manager for Manager {
27//!     type Type = Computer;
28//!     type Error = Error;
29//!
30//!     async fn create(&self) -> Result<Computer, Error> {
31//!         Ok(Computer {})
32//!     }
33//!     async fn recycle(&self, conn: &mut Computer) -> managed::RecycleResult<Error> {
34//!         Ok(())
35//!     }
36//! }
37//!
38//! type Pool = managed::Pool<Manager>;
39//!
40//! #[tokio::main]
41//! async fn main() {
42//!     let mgr = Manager {};
43//!     let pool = Pool::builder(mgr).max_size(16).build().unwrap();
44//!     let mut conn = pool.get().await.unwrap();
45//!     let answer = conn.get_answer().await;
46//!     assert_eq!(answer, 42);
47//! }
48//! ```
49//!
50//! For a more complete example please see
51//! [`deadpool-postgres`](https://crates.io/crates/deadpool-postgres) crate.
52
53mod builder;
54mod config;
55mod dropguard;
56mod errors;
57mod hooks;
58mod metrics;
59pub mod reexports;
60
61#[deprecated(
62    since = "0.9.1",
63    note = "This module has been deprecated in favor of the dedicated `deadpool-sync` utility crate."
64)]
65pub mod sync;
66
67use std::{
68    collections::VecDeque,
69    convert::TryFrom,
70    fmt,
71    future::Future,
72    marker::PhantomData,
73    ops::{Deref, DerefMut},
74    sync::{
75        atomic::{AtomicUsize, Ordering},
76        Arc, Mutex, Weak,
77    },
78    time::{Duration, Instant},
79};
80
81use async_trait::async_trait;
82use deadpool_runtime::Runtime;
83use retain_mut::RetainMut;
84use tokio::sync::{Semaphore, TryAcquireError};
85
86pub use crate::Status;
87
88use self::dropguard::DropGuard;
89pub use self::{
90    builder::{BuildError, PoolBuilder},
91    config::{CreatePoolError, PoolConfig, Timeouts},
92    errors::{PoolError, RecycleError, TimeoutType},
93    hooks::{Hook, HookError, HookErrorCause, HookFuture, HookResult},
94    metrics::Metrics,
95};
96
97/// Result type of the [`Manager::recycle()`] method.
98pub type RecycleResult<E> = Result<(), RecycleError<E>>;
99
100/// Manager responsible for creating new [`Object`]s or recycling existing ones.
101#[async_trait]
102pub trait Manager: Sync + Send {
103    /// Type of [`Object`]s that this [`Manager`] creates and recycles.
104    type Type;
105    /// Error that this [`Manager`] can return when creating and/or recycling
106    /// [`Object`]s.
107    type Error;
108
109    /// Creates a new instance of [`Manager::Type`].
110    async fn create(&self) -> Result<Self::Type, Self::Error>;
111
112    /// Tries to recycle an instance of [`Manager::Type`].
113    ///
114    /// # Errors
115    ///
116    /// Returns [`Manager::Error`] if the instance couldn't be recycled.
117    async fn recycle(&self, obj: &mut Self::Type) -> RecycleResult<Self::Error>;
118
119    /// Detaches an instance of [`Manager::Type`] from this [`Manager`].
120    ///
121    /// This method is called when using the [`Object::take()`] method for
122    /// removing an [`Object`] from a [`Pool`]. If the [`Manager`] doesn't hold
123    /// any references to the handed out [`Object`]s then the default
124    /// implementation can be used which does nothing.
125    fn detach(&self, _obj: &mut Self::Type) {}
126}
127
128/// Wrapper around the actual pooled object which implements [`Deref`],
129/// [`DerefMut`] and [`Drop`] traits.
130///
131/// Use this object just as if it was of type `T` and upon leaving a scope the
132/// [`Drop::drop()`] will take care of returning it to the pool.
133#[must_use]
134pub struct Object<M: Manager> {
135    /// The actual object
136    inner: Option<ObjectInner<M>>,
137
138    /// Pool to return the pooled object to.
139    pool: Weak<PoolInner<M>>,
140}
141
142impl<M> fmt::Debug for Object<M>
143where
144    M: fmt::Debug + Manager,
145    M::Type: fmt::Debug,
146{
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        f.debug_struct("Object")
149            .field("inner", &self.inner)
150            .finish()
151    }
152}
153
154struct UnreadyObject<'a, M: Manager> {
155    inner: Option<ObjectInner<M>>,
156    pool: &'a PoolInner<M>,
157}
158
159impl<'a, M: Manager> UnreadyObject<'a, M> {
160    fn ready(mut self) -> ObjectInner<M> {
161        self.inner.take().unwrap()
162    }
163}
164
165impl<'a, M: Manager> Drop for UnreadyObject<'a, M> {
166    fn drop(&mut self) {
167        if let Some(mut inner) = self.inner.take() {
168            self.pool.slots.lock().unwrap().size -= 1;
169            self.pool.manager.detach(&mut inner.obj);
170        }
171    }
172}
173
174impl<'a, M: Manager> Deref for UnreadyObject<'a, M> {
175    type Target = ObjectInner<M>;
176    fn deref(&self) -> &Self::Target {
177        self.inner.as_ref().unwrap()
178    }
179}
180
181impl<'a, M: Manager> DerefMut for UnreadyObject<'a, M> {
182    fn deref_mut(&mut self) -> &mut Self::Target {
183        self.inner.as_mut().unwrap()
184    }
185}
186
187#[derive(Debug)]
188pub(crate) struct ObjectInner<M: Manager> {
189    /// Actual pooled object.
190    obj: M::Type,
191
192    /// Object metrics.
193    metrics: Metrics,
194}
195
196impl<M: Manager> Object<M> {
197    /// Takes this [`Object`] from its [`Pool`] permanently. This reduces the
198    /// size of the [`Pool`].
199    #[must_use]
200    pub fn take(mut this: Self) -> M::Type {
201        let mut inner = this.inner.take().unwrap().obj;
202        if let Some(pool) = Object::pool(&this) {
203            pool.inner.detach_object(&mut inner)
204        }
205        inner
206    }
207
208    /// Get object statistics
209    pub fn metrics(this: &Self) -> &Metrics {
210        &this.inner.as_ref().unwrap().metrics
211    }
212
213    /// Returns the [`Pool`] this [`Object`] belongs to.
214    ///
215    /// Since [`Object`]s only hold a [`Weak`] reference to the [`Pool`] they
216    /// come from, this can fail and return [`None`] instead.
217    pub fn pool(this: &Self) -> Option<Pool<M>> {
218        this.pool.upgrade().map(|inner| Pool {
219            inner,
220            _wrapper: PhantomData::default(),
221        })
222    }
223}
224
225impl<M: Manager> Drop for Object<M> {
226    fn drop(&mut self) {
227        if let Some(inner) = self.inner.take() {
228            if let Some(pool) = self.pool.upgrade() {
229                pool.return_object(inner)
230            }
231        }
232    }
233}
234
235impl<M: Manager> Deref for Object<M> {
236    type Target = M::Type;
237    fn deref(&self) -> &M::Type {
238        &self.inner.as_ref().unwrap().obj
239    }
240}
241
242impl<M: Manager> DerefMut for Object<M> {
243    fn deref_mut(&mut self) -> &mut Self::Target {
244        &mut self.inner.as_mut().unwrap().obj
245    }
246}
247
248impl<M: Manager> AsRef<M::Type> for Object<M> {
249    fn as_ref(&self) -> &M::Type {
250        self
251    }
252}
253
254impl<M: Manager> AsMut<M::Type> for Object<M> {
255    fn as_mut(&mut self) -> &mut M::Type {
256        self
257    }
258}
259
260/// Generic object and connection pool.
261///
262/// This struct can be cloned and transferred across thread boundaries and uses
263/// reference counting for its internal state.
264pub struct Pool<M: Manager, W: From<Object<M>> = Object<M>> {
265    inner: Arc<PoolInner<M>>,
266    _wrapper: PhantomData<fn() -> W>,
267}
268
269// Implemented manually to avoid unnecessary trait bound on `W` type parameter.
270impl<M, W> fmt::Debug for Pool<M, W>
271where
272    M: fmt::Debug + Manager,
273    M::Type: fmt::Debug,
274    W: From<Object<M>>,
275{
276    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277        f.debug_struct("Pool")
278            .field("inner", &self.inner)
279            .field("wrapper", &self._wrapper)
280            .finish()
281    }
282}
283
284impl<M: Manager, W: From<Object<M>>> Clone for Pool<M, W> {
285    fn clone(&self) -> Self {
286        Self {
287            inner: self.inner.clone(),
288            _wrapper: PhantomData::default(),
289        }
290    }
291}
292
293impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
294    /// Instantiates a builder for a new [`Pool`].
295    ///
296    /// This is the only way to create a [`Pool`] instance.
297    pub fn builder(manager: M) -> PoolBuilder<M, W> {
298        PoolBuilder::new(manager)
299    }
300
301    pub(crate) fn from_builder(builder: PoolBuilder<M, W>) -> Self {
302        Self {
303            inner: Arc::new(PoolInner {
304                manager: Box::new(builder.manager),
305                slots: Mutex::new(Slots {
306                    vec: VecDeque::with_capacity(builder.config.max_size),
307                    size: 0,
308                    max_size: builder.config.max_size,
309                }),
310                users: AtomicUsize::new(0),
311                semaphore: Semaphore::new(builder.config.max_size),
312                config: builder.config,
313                hooks: builder.hooks,
314                runtime: builder.runtime,
315            }),
316            _wrapper: PhantomData::default(),
317        }
318    }
319
320    /// Retrieves an [`Object`] from this [`Pool`] or waits for one to
321    /// become available.
322    ///
323    /// # Errors
324    ///
325    /// See [`PoolError`] for details.
326    pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
327        self.timeout_get(&self.timeouts()).await
328    }
329
330    /// Retrieves an [`Object`] from this [`Pool`] and doesn't wait if there is
331    /// currently no [`Object`] available and the maximum [`Pool`] size has
332    /// been reached.
333    ///
334    /// # Errors
335    ///
336    /// See [`PoolError`] for details.
337    #[deprecated(
338        since = "0.9.3",
339        note = "The name of this method is highly misleading. Please use timeout_get instead. e.g.\n`pool.timeout_get(&Timeouts { wait: Some(Duration::ZERO), ..pool.timeouts() })`"
340    )]
341    pub async fn try_get(&self) -> Result<W, PoolError<M::Error>> {
342        self.timeout_get(&Timeouts {
343            wait: Some(Duration::ZERO),
344            ..self.timeouts()
345        })
346        .await
347    }
348
349    /// Retrieves an [`Object`] from this [`Pool`] using a different `timeout`
350    /// than the configured one.
351    ///
352    /// # Errors
353    ///
354    /// See [`PoolError`] for details.
355    pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
356        let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
357        let users_guard = DropGuard(|| {
358            let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
359        });
360
361        let non_blocking = match timeouts.wait {
362            Some(t) => t.as_nanos() == 0,
363            None => false,
364        };
365
366        let permit = if non_blocking {
367            self.inner.semaphore.try_acquire().map_err(|e| match e {
368                TryAcquireError::Closed => PoolError::Closed,
369                TryAcquireError::NoPermits => PoolError::Timeout(TimeoutType::Wait),
370            })?
371        } else {
372            apply_timeout(
373                self.inner.runtime,
374                TimeoutType::Wait,
375                timeouts.wait,
376                async {
377                    self.inner
378                        .semaphore
379                        .acquire()
380                        .await
381                        .map_err(|_| PoolError::Closed)
382                },
383            )
384            .await?
385        };
386
387        let inner_obj = loop {
388            let inner_obj = self.inner.slots.lock().unwrap().vec.pop_front();
389            if let Some(inner_obj) = inner_obj {
390                let mut unready_obj = UnreadyObject {
391                    inner: Some(inner_obj),
392                    pool: &self.inner,
393                };
394
395                // Apply pre_recycle hooks
396                if let Some(_e) = self
397                    .inner
398                    .hooks
399                    .pre_recycle
400                    .apply(&mut unready_obj, PoolError::PreRecycleHook)
401                    .await?
402                {
403                    continue;
404                }
405
406                if apply_timeout(
407                    self.inner.runtime,
408                    TimeoutType::Recycle,
409                    timeouts.recycle,
410                    self.inner.manager.recycle(&mut unready_obj.obj),
411                )
412                .await
413                .is_err()
414                {
415                    continue;
416                }
417
418                // Apply post_recycle hooks
419                if let Some(_e) = self
420                    .inner
421                    .hooks
422                    .post_recycle
423                    .apply(&mut unready_obj, PoolError::PostRecycleHook)
424                    .await?
425                {
426                    continue;
427                }
428
429                unready_obj.metrics.recycle_count += 1;
430                unready_obj.metrics.recycled = Some(Instant::now());
431
432                break unready_obj.ready();
433            } else {
434                // Create new object
435                let mut unready_obj = UnreadyObject {
436                    inner: Some(ObjectInner {
437                        obj: apply_timeout(
438                            self.inner.runtime,
439                            TimeoutType::Create,
440                            timeouts.create,
441                            self.inner.manager.create(),
442                        )
443                        .await?,
444                        metrics: Metrics::default(),
445                    }),
446                    pool: &self.inner,
447                };
448
449                self.inner.slots.lock().unwrap().size += 1;
450
451                // Apply post_create hooks
452                if let Some(_e) = self
453                    .inner
454                    .hooks
455                    .post_create
456                    .apply(&mut *unready_obj, PoolError::PostCreateHook)
457                    .await?
458                {
459                    continue;
460                }
461
462                break unready_obj.ready();
463            }
464        };
465
466        users_guard.disarm();
467        permit.forget();
468
469        Ok(Object {
470            inner: Some(inner_obj),
471            pool: Arc::downgrade(&self.inner),
472        }
473        .into())
474    }
475
476    /**
477     * Resize the pool. This change the `max_size` of the pool dropping
478     * excess objects and/or making space for new ones.
479     *
480     * If the pool is closed this method does nothing. The [`Pool::status`] method
481     * always reports a `max_size` of 0 for closed pools.
482     */
483    pub fn resize(&self, max_size: usize) {
484        if self.inner.semaphore.is_closed() {
485            return;
486        }
487        let mut slots = self.inner.slots.lock().unwrap();
488        let old_max_size = slots.max_size;
489        slots.max_size = max_size;
490        // shrink pool
491        if max_size < old_max_size {
492            while slots.size > slots.max_size {
493                if let Ok(permit) = self.inner.semaphore.try_acquire() {
494                    permit.forget();
495                    if slots.vec.pop_front().is_some() {
496                        slots.size -= 1;
497                    }
498                } else {
499                    break;
500                }
501            }
502            // Create a new VecDeque with a smaller capacity
503            let mut vec = VecDeque::with_capacity(max_size);
504            for obj in slots.vec.drain(..) {
505                vec.push_back(obj);
506            }
507            slots.vec = vec;
508        }
509        // grow pool
510        if max_size > old_max_size {
511            let additional = slots.max_size - slots.size;
512            slots.vec.reserve_exact(additional);
513            self.inner.semaphore.add_permits(additional);
514        }
515    }
516
517    /// Retains only the objects specified by the given function.
518    ///
519    /// This function is typically used to remove objects from
520    /// the pool based on their current state or metrics.
521    ///
522    /// **Caution:** This function blocks the entire pool while
523    /// it is running. Therefore the given function should not
524    /// block.
525    ///
526    /// The following example starts a background task that
527    /// runs every 30 seconds and removes objects from the pool
528    /// that haven't been used for more than one minute.
529    ///
530    /// ```rust,ignore
531    /// let interval = Duration::from_secs(30);
532    /// let max_age = Duration::from_secs(60);
533    /// tokio::spawn(async move {
534    ///     loop {
535    ///         tokio::time::sleep(interval).await;
536    ///         pool.retain(|_, metrics| metrics.last_used() < max_age);
537    ///     }
538    /// });
539    /// ```
540    pub fn retain(&self, f: impl Fn(&M::Type, Metrics) -> bool) {
541        let mut guard = self.inner.slots.lock().unwrap();
542        let len_before = guard.vec.len();
543        RetainMut::retain_mut(&mut guard.vec, |obj| {
544            if f(&obj.obj, obj.metrics) {
545                true
546            } else {
547                self.manager().detach(&mut obj.obj);
548                false
549            }
550        });
551        guard.size -= len_before - guard.vec.len();
552    }
553
554    /// Get current timeout configuration
555    pub fn timeouts(&self) -> Timeouts {
556        self.inner.config.timeouts
557    }
558
559    /// Closes this [`Pool`].
560    ///
561    /// All current and future tasks waiting for [`Object`]s will return
562    /// [`PoolError::Closed`] immediately.
563    ///
564    /// This operation resizes the pool to 0.
565    pub fn close(&self) {
566        self.resize(0);
567        self.inner.semaphore.close();
568    }
569
570    /// Indicates whether this [`Pool`] has been closed.
571    pub fn is_closed(&self) -> bool {
572        self.inner.semaphore.is_closed()
573    }
574
575    /// Retrieves [`Status`] of this [`Pool`].
576    #[must_use]
577    pub fn status(&self) -> Status {
578        let slots = self.inner.slots.lock().unwrap();
579        let used = self.inner.users.load(Ordering::Relaxed);
580        let available = isize::try_from(slots.size).unwrap() - isize::try_from(used).unwrap();
581        Status {
582            max_size: slots.max_size,
583            size: slots.size,
584            available,
585        }
586    }
587
588    /// Returns [`Manager`] of this [`Pool`].
589    #[must_use]
590    pub fn manager(&self) -> &M {
591        &*self.inner.manager
592    }
593}
594
595struct PoolInner<M: Manager> {
596    manager: Box<M>,
597    slots: Mutex<Slots<ObjectInner<M>>>,
598    /// Number of available [`Object`]s in the [`Pool`]. If there are no
599    /// [`Object`]s in the [`Pool`] this number can become negative and store
600    /// the number of [`Future`]s waiting for an [`Object`].
601    users: AtomicUsize,
602    semaphore: Semaphore,
603    config: PoolConfig,
604    runtime: Option<Runtime>,
605    hooks: hooks::Hooks<M>,
606}
607
608#[derive(Debug)]
609struct Slots<T> {
610    vec: VecDeque<T>,
611    size: usize,
612    max_size: usize,
613}
614
615// Implemented manually to avoid unnecessary trait bound on the struct.
616impl<M> fmt::Debug for PoolInner<M>
617where
618    M: fmt::Debug + Manager,
619    M::Type: fmt::Debug,
620{
621    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
622        f.debug_struct("PoolInner")
623            .field("manager", &self.manager)
624            .field("slots", &self.slots)
625            .field("used", &self.users)
626            .field("semaphore", &self.semaphore)
627            .field("config", &self.config)
628            .field("runtime", &self.runtime)
629            .field("hooks", &self.hooks)
630            .finish()
631    }
632}
633
634impl<M: Manager> PoolInner<M> {
635    fn return_object(&self, mut inner: ObjectInner<M>) {
636        let _ = self.users.fetch_sub(1, Ordering::Relaxed);
637        let mut slots = self.slots.lock().unwrap();
638        if slots.size <= slots.max_size {
639            slots.vec.push_back(inner);
640            drop(slots);
641            self.semaphore.add_permits(1);
642        } else {
643            slots.size -= 1;
644            drop(slots);
645            self.manager.detach(&mut inner.obj);
646        }
647    }
648    fn detach_object(&self, obj: &mut M::Type) {
649        let _ = self.users.fetch_sub(1, Ordering::Relaxed);
650        let mut slots = self.slots.lock().unwrap();
651        let add_permits = slots.size <= slots.max_size;
652        slots.size -= 1;
653        drop(slots);
654        if add_permits {
655            self.semaphore.add_permits(1);
656        }
657        self.manager.detach(obj);
658    }
659}
660
661async fn apply_timeout<O, E>(
662    runtime: Option<Runtime>,
663    timeout_type: TimeoutType,
664    duration: Option<Duration>,
665    future: impl Future<Output = Result<O, impl Into<PoolError<E>>>>,
666) -> Result<O, PoolError<E>> {
667    match (runtime, duration) {
668        (_, None) => future.await.map_err(Into::into),
669        (Some(runtime), Some(duration)) => runtime
670            .timeout(duration, future)
671            .await
672            .ok_or(PoolError::Timeout(timeout_type))?
673            .map_err(Into::into),
674        (None, Some(_)) => Err(PoolError::NoRuntimeSpecified),
675    }
676}