1mod builder;
54mod config;
55mod dropguard;
56mod errors;
57mod hooks;
58mod metrics;
59pub mod reexports;
60
61#[deprecated(
62 since = "0.9.1",
63 note = "This module has been deprecated in favor of the dedicated `deadpool-sync` utility crate."
64)]
65pub mod sync;
66
67use std::{
68 collections::VecDeque,
69 convert::TryFrom,
70 fmt,
71 future::Future,
72 marker::PhantomData,
73 ops::{Deref, DerefMut},
74 sync::{
75 atomic::{AtomicUsize, Ordering},
76 Arc, Mutex, Weak,
77 },
78 time::{Duration, Instant},
79};
80
81use async_trait::async_trait;
82use deadpool_runtime::Runtime;
83use retain_mut::RetainMut;
84use tokio::sync::{Semaphore, TryAcquireError};
85
86pub use crate::Status;
87
88use self::dropguard::DropGuard;
89pub use self::{
90 builder::{BuildError, PoolBuilder},
91 config::{CreatePoolError, PoolConfig, Timeouts},
92 errors::{PoolError, RecycleError, TimeoutType},
93 hooks::{Hook, HookError, HookErrorCause, HookFuture, HookResult},
94 metrics::Metrics,
95};
96
97pub type RecycleResult<E> = Result<(), RecycleError<E>>;
99
100#[async_trait]
102pub trait Manager: Sync + Send {
103 type Type;
105 type Error;
108
109 async fn create(&self) -> Result<Self::Type, Self::Error>;
111
112 async fn recycle(&self, obj: &mut Self::Type) -> RecycleResult<Self::Error>;
118
119 fn detach(&self, _obj: &mut Self::Type) {}
126}
127
128#[must_use]
134pub struct Object<M: Manager> {
135 inner: Option<ObjectInner<M>>,
137
138 pool: Weak<PoolInner<M>>,
140}
141
142impl<M> fmt::Debug for Object<M>
143where
144 M: fmt::Debug + Manager,
145 M::Type: fmt::Debug,
146{
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 f.debug_struct("Object")
149 .field("inner", &self.inner)
150 .finish()
151 }
152}
153
154struct UnreadyObject<'a, M: Manager> {
155 inner: Option<ObjectInner<M>>,
156 pool: &'a PoolInner<M>,
157}
158
159impl<'a, M: Manager> UnreadyObject<'a, M> {
160 fn ready(mut self) -> ObjectInner<M> {
161 self.inner.take().unwrap()
162 }
163}
164
165impl<'a, M: Manager> Drop for UnreadyObject<'a, M> {
166 fn drop(&mut self) {
167 if let Some(mut inner) = self.inner.take() {
168 self.pool.slots.lock().unwrap().size -= 1;
169 self.pool.manager.detach(&mut inner.obj);
170 }
171 }
172}
173
174impl<'a, M: Manager> Deref for UnreadyObject<'a, M> {
175 type Target = ObjectInner<M>;
176 fn deref(&self) -> &Self::Target {
177 self.inner.as_ref().unwrap()
178 }
179}
180
181impl<'a, M: Manager> DerefMut for UnreadyObject<'a, M> {
182 fn deref_mut(&mut self) -> &mut Self::Target {
183 self.inner.as_mut().unwrap()
184 }
185}
186
187#[derive(Debug)]
188pub(crate) struct ObjectInner<M: Manager> {
189 obj: M::Type,
191
192 metrics: Metrics,
194}
195
196impl<M: Manager> Object<M> {
197 #[must_use]
200 pub fn take(mut this: Self) -> M::Type {
201 let mut inner = this.inner.take().unwrap().obj;
202 if let Some(pool) = Object::pool(&this) {
203 pool.inner.detach_object(&mut inner)
204 }
205 inner
206 }
207
208 pub fn metrics(this: &Self) -> &Metrics {
210 &this.inner.as_ref().unwrap().metrics
211 }
212
213 pub fn pool(this: &Self) -> Option<Pool<M>> {
218 this.pool.upgrade().map(|inner| Pool {
219 inner,
220 _wrapper: PhantomData::default(),
221 })
222 }
223}
224
225impl<M: Manager> Drop for Object<M> {
226 fn drop(&mut self) {
227 if let Some(inner) = self.inner.take() {
228 if let Some(pool) = self.pool.upgrade() {
229 pool.return_object(inner)
230 }
231 }
232 }
233}
234
235impl<M: Manager> Deref for Object<M> {
236 type Target = M::Type;
237 fn deref(&self) -> &M::Type {
238 &self.inner.as_ref().unwrap().obj
239 }
240}
241
242impl<M: Manager> DerefMut for Object<M> {
243 fn deref_mut(&mut self) -> &mut Self::Target {
244 &mut self.inner.as_mut().unwrap().obj
245 }
246}
247
248impl<M: Manager> AsRef<M::Type> for Object<M> {
249 fn as_ref(&self) -> &M::Type {
250 self
251 }
252}
253
254impl<M: Manager> AsMut<M::Type> for Object<M> {
255 fn as_mut(&mut self) -> &mut M::Type {
256 self
257 }
258}
259
260pub struct Pool<M: Manager, W: From<Object<M>> = Object<M>> {
265 inner: Arc<PoolInner<M>>,
266 _wrapper: PhantomData<fn() -> W>,
267}
268
269impl<M, W> fmt::Debug for Pool<M, W>
271where
272 M: fmt::Debug + Manager,
273 M::Type: fmt::Debug,
274 W: From<Object<M>>,
275{
276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277 f.debug_struct("Pool")
278 .field("inner", &self.inner)
279 .field("wrapper", &self._wrapper)
280 .finish()
281 }
282}
283
284impl<M: Manager, W: From<Object<M>>> Clone for Pool<M, W> {
285 fn clone(&self) -> Self {
286 Self {
287 inner: self.inner.clone(),
288 _wrapper: PhantomData::default(),
289 }
290 }
291}
292
293impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
294 pub fn builder(manager: M) -> PoolBuilder<M, W> {
298 PoolBuilder::new(manager)
299 }
300
301 pub(crate) fn from_builder(builder: PoolBuilder<M, W>) -> Self {
302 Self {
303 inner: Arc::new(PoolInner {
304 manager: Box::new(builder.manager),
305 slots: Mutex::new(Slots {
306 vec: VecDeque::with_capacity(builder.config.max_size),
307 size: 0,
308 max_size: builder.config.max_size,
309 }),
310 users: AtomicUsize::new(0),
311 semaphore: Semaphore::new(builder.config.max_size),
312 config: builder.config,
313 hooks: builder.hooks,
314 runtime: builder.runtime,
315 }),
316 _wrapper: PhantomData::default(),
317 }
318 }
319
320 pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
327 self.timeout_get(&self.timeouts()).await
328 }
329
330 #[deprecated(
338 since = "0.9.3",
339 note = "The name of this method is highly misleading. Please use timeout_get instead. e.g.\n`pool.timeout_get(&Timeouts { wait: Some(Duration::ZERO), ..pool.timeouts() })`"
340 )]
341 pub async fn try_get(&self) -> Result<W, PoolError<M::Error>> {
342 self.timeout_get(&Timeouts {
343 wait: Some(Duration::ZERO),
344 ..self.timeouts()
345 })
346 .await
347 }
348
349 pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
356 let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
357 let users_guard = DropGuard(|| {
358 let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
359 });
360
361 let non_blocking = match timeouts.wait {
362 Some(t) => t.as_nanos() == 0,
363 None => false,
364 };
365
366 let permit = if non_blocking {
367 self.inner.semaphore.try_acquire().map_err(|e| match e {
368 TryAcquireError::Closed => PoolError::Closed,
369 TryAcquireError::NoPermits => PoolError::Timeout(TimeoutType::Wait),
370 })?
371 } else {
372 apply_timeout(
373 self.inner.runtime,
374 TimeoutType::Wait,
375 timeouts.wait,
376 async {
377 self.inner
378 .semaphore
379 .acquire()
380 .await
381 .map_err(|_| PoolError::Closed)
382 },
383 )
384 .await?
385 };
386
387 let inner_obj = loop {
388 let inner_obj = self.inner.slots.lock().unwrap().vec.pop_front();
389 if let Some(inner_obj) = inner_obj {
390 let mut unready_obj = UnreadyObject {
391 inner: Some(inner_obj),
392 pool: &self.inner,
393 };
394
395 if let Some(_e) = self
397 .inner
398 .hooks
399 .pre_recycle
400 .apply(&mut unready_obj, PoolError::PreRecycleHook)
401 .await?
402 {
403 continue;
404 }
405
406 if apply_timeout(
407 self.inner.runtime,
408 TimeoutType::Recycle,
409 timeouts.recycle,
410 self.inner.manager.recycle(&mut unready_obj.obj),
411 )
412 .await
413 .is_err()
414 {
415 continue;
416 }
417
418 if let Some(_e) = self
420 .inner
421 .hooks
422 .post_recycle
423 .apply(&mut unready_obj, PoolError::PostRecycleHook)
424 .await?
425 {
426 continue;
427 }
428
429 unready_obj.metrics.recycle_count += 1;
430 unready_obj.metrics.recycled = Some(Instant::now());
431
432 break unready_obj.ready();
433 } else {
434 let mut unready_obj = UnreadyObject {
436 inner: Some(ObjectInner {
437 obj: apply_timeout(
438 self.inner.runtime,
439 TimeoutType::Create,
440 timeouts.create,
441 self.inner.manager.create(),
442 )
443 .await?,
444 metrics: Metrics::default(),
445 }),
446 pool: &self.inner,
447 };
448
449 self.inner.slots.lock().unwrap().size += 1;
450
451 if let Some(_e) = self
453 .inner
454 .hooks
455 .post_create
456 .apply(&mut *unready_obj, PoolError::PostCreateHook)
457 .await?
458 {
459 continue;
460 }
461
462 break unready_obj.ready();
463 }
464 };
465
466 users_guard.disarm();
467 permit.forget();
468
469 Ok(Object {
470 inner: Some(inner_obj),
471 pool: Arc::downgrade(&self.inner),
472 }
473 .into())
474 }
475
476 pub fn resize(&self, max_size: usize) {
484 if self.inner.semaphore.is_closed() {
485 return;
486 }
487 let mut slots = self.inner.slots.lock().unwrap();
488 let old_max_size = slots.max_size;
489 slots.max_size = max_size;
490 if max_size < old_max_size {
492 while slots.size > slots.max_size {
493 if let Ok(permit) = self.inner.semaphore.try_acquire() {
494 permit.forget();
495 if slots.vec.pop_front().is_some() {
496 slots.size -= 1;
497 }
498 } else {
499 break;
500 }
501 }
502 let mut vec = VecDeque::with_capacity(max_size);
504 for obj in slots.vec.drain(..) {
505 vec.push_back(obj);
506 }
507 slots.vec = vec;
508 }
509 if max_size > old_max_size {
511 let additional = slots.max_size - slots.size;
512 slots.vec.reserve_exact(additional);
513 self.inner.semaphore.add_permits(additional);
514 }
515 }
516
517 pub fn retain(&self, f: impl Fn(&M::Type, Metrics) -> bool) {
541 let mut guard = self.inner.slots.lock().unwrap();
542 let len_before = guard.vec.len();
543 RetainMut::retain_mut(&mut guard.vec, |obj| {
544 if f(&obj.obj, obj.metrics) {
545 true
546 } else {
547 self.manager().detach(&mut obj.obj);
548 false
549 }
550 });
551 guard.size -= len_before - guard.vec.len();
552 }
553
554 pub fn timeouts(&self) -> Timeouts {
556 self.inner.config.timeouts
557 }
558
559 pub fn close(&self) {
566 self.resize(0);
567 self.inner.semaphore.close();
568 }
569
570 pub fn is_closed(&self) -> bool {
572 self.inner.semaphore.is_closed()
573 }
574
575 #[must_use]
577 pub fn status(&self) -> Status {
578 let slots = self.inner.slots.lock().unwrap();
579 let used = self.inner.users.load(Ordering::Relaxed);
580 let available = isize::try_from(slots.size).unwrap() - isize::try_from(used).unwrap();
581 Status {
582 max_size: slots.max_size,
583 size: slots.size,
584 available,
585 }
586 }
587
588 #[must_use]
590 pub fn manager(&self) -> &M {
591 &*self.inner.manager
592 }
593}
594
595struct PoolInner<M: Manager> {
596 manager: Box<M>,
597 slots: Mutex<Slots<ObjectInner<M>>>,
598 users: AtomicUsize,
602 semaphore: Semaphore,
603 config: PoolConfig,
604 runtime: Option<Runtime>,
605 hooks: hooks::Hooks<M>,
606}
607
608#[derive(Debug)]
609struct Slots<T> {
610 vec: VecDeque<T>,
611 size: usize,
612 max_size: usize,
613}
614
615impl<M> fmt::Debug for PoolInner<M>
617where
618 M: fmt::Debug + Manager,
619 M::Type: fmt::Debug,
620{
621 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
622 f.debug_struct("PoolInner")
623 .field("manager", &self.manager)
624 .field("slots", &self.slots)
625 .field("used", &self.users)
626 .field("semaphore", &self.semaphore)
627 .field("config", &self.config)
628 .field("runtime", &self.runtime)
629 .field("hooks", &self.hooks)
630 .finish()
631 }
632}
633
634impl<M: Manager> PoolInner<M> {
635 fn return_object(&self, mut inner: ObjectInner<M>) {
636 let _ = self.users.fetch_sub(1, Ordering::Relaxed);
637 let mut slots = self.slots.lock().unwrap();
638 if slots.size <= slots.max_size {
639 slots.vec.push_back(inner);
640 drop(slots);
641 self.semaphore.add_permits(1);
642 } else {
643 slots.size -= 1;
644 drop(slots);
645 self.manager.detach(&mut inner.obj);
646 }
647 }
648 fn detach_object(&self, obj: &mut M::Type) {
649 let _ = self.users.fetch_sub(1, Ordering::Relaxed);
650 let mut slots = self.slots.lock().unwrap();
651 let add_permits = slots.size <= slots.max_size;
652 slots.size -= 1;
653 drop(slots);
654 if add_permits {
655 self.semaphore.add_permits(1);
656 }
657 self.manager.detach(obj);
658 }
659}
660
661async fn apply_timeout<O, E>(
662 runtime: Option<Runtime>,
663 timeout_type: TimeoutType,
664 duration: Option<Duration>,
665 future: impl Future<Output = Result<O, impl Into<PoolError<E>>>>,
666) -> Result<O, PoolError<E>> {
667 match (runtime, duration) {
668 (_, None) => future.await.map_err(Into::into),
669 (Some(runtime), Some(duration)) => runtime
670 .timeout(duration, future)
671 .await
672 .ok_or(PoolError::Timeout(timeout_type))?
673 .map_err(Into::into),
674 (None, Some(_)) => Err(PoolError::NoRuntimeSpecified),
675 }
676}