deadpool/managed/
sync.rs

1//! Helpers for writing pools for objects that don't support async and need to
2//! be run inside a thread.
3
4pub mod reexports {
5    //! This module contains all things that should be reexported
6    //! by backend implementations in order to avoid direct
7    //! dependencies on the `deadpool` crate itself.
8    //!
9    //! This module is the variant that should be used by *sync*
10    //! backends.
11    //!
12    //! Crates based on `deadpool::managed::sync` should include this line:
13    //! ```rust,ignore
14    //! pub use deadpool::managed::sync::reexports::*;
15    //! deadpool::managed_reexports!(
16    //!     "name_of_crate",
17    //!     Manager,
18    //!     Object<Manager>,
19    //!     Error,
20    //!     ConfigError
21    //! );
22    //! ```
23
24    pub use super::super::reexports::*;
25    pub use super::{InteractError, SyncGuard};
26}
27
28use std::{
29    any::Any,
30    fmt,
31    marker::PhantomData,
32    ops::{Deref, DerefMut},
33    sync::{Arc, Mutex, MutexGuard, PoisonError, TryLockError},
34};
35
36use crate::{Runtime, SpawnBlockingError};
37
38/// Possible errors returned when [`SyncWrapper::interact()`] fails.
39#[derive(Debug)]
40pub enum InteractError<E> {
41    /// Provided callback has panicked.
42    Panic(Box<dyn Any + Send + 'static>),
43
44    /// Callback was aborted.
45    Aborted,
46
47    /// Backend returned an error.
48    Backend(E),
49}
50
51impl<E: fmt::Display> fmt::Display for InteractError<E> {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        match self {
54            Self::Panic(_) => write!(f, "Panic"),
55            Self::Aborted => write!(f, "Aborted"),
56            Self::Backend(e) => write!(f, "Backend error: {}", e),
57        }
58    }
59}
60
61impl<E: std::error::Error + 'static> std::error::Error for InteractError<E> {
62    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
63        match self {
64            Self::Panic(_) | Self::Aborted => None,
65            Self::Backend(e) => Some(e),
66        }
67    }
68}
69
70/// Wrapper for objects which only provides blocking functions that need to be
71/// called on a separate thread.
72///
73/// Access to the wrapped object is provided via the [`SyncWrapper::interact()`]
74/// method.
75#[must_use]
76pub struct SyncWrapper<T, E>
77where
78    T: Send + 'static,
79    E: Send + 'static,
80{
81    obj: Arc<Mutex<Option<T>>>,
82    runtime: Runtime,
83    _error: PhantomData<fn() -> E>,
84}
85
86// Implemented manually to avoid unnecessary trait bound on `E` type parameter.
87impl<T, E> fmt::Debug for SyncWrapper<T, E>
88where
89    T: fmt::Debug + Send + 'static,
90    E: Send + 'static,
91{
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("SyncWrapper")
94            .field("obj", &self.obj)
95            .field("runtime", &self.runtime)
96            .field("_error", &self._error)
97            .finish()
98    }
99}
100
101impl<T, E> SyncWrapper<T, E>
102where
103    T: Send + 'static,
104    E: Send + 'static,
105{
106    /// Creates a new wrapped object.
107    pub async fn new<F>(runtime: Runtime, f: F) -> Result<Self, E>
108    where
109        F: FnOnce() -> Result<T, E> + Send + 'static,
110    {
111        let result = match runtime.spawn_blocking(f).await {
112            // FIXME: Panicking when the creation panics is not nice.
113            // In order to handle this properly the Manager::create
114            // methods needs to support a custom error enum which
115            // supports a Panic variant.
116            Err(SpawnBlockingError::Panic(e)) => panic!("{:?}", e),
117            Ok(obj) => obj,
118        };
119        result.map(|obj| Self {
120            obj: Arc::new(Mutex::new(Some(obj))),
121            runtime,
122            _error: PhantomData::default(),
123        })
124    }
125
126    /// Interacts with the underlying object.
127    ///
128    /// Expects a closure that takes the object as its parameter.
129    /// The closure is executed in a separate thread so that the async runtime
130    /// is not blocked.
131    pub async fn interact<F, R>(&self, f: F) -> Result<R, InteractError<E>>
132    where
133        F: FnOnce(&mut T) -> Result<R, E> + Send + 'static,
134        R: Send + 'static,
135    {
136        let arc = self.obj.clone();
137        self.runtime
138            .spawn_blocking(move || {
139                let mut guard = arc.lock().unwrap();
140                let conn = guard.as_mut().unwrap();
141                f(conn)
142            })
143            .await
144            .map_err(|e| match e {
145                SpawnBlockingError::Panic(p) => InteractError::Panic(p),
146            })?
147            .map_err(InteractError::Backend)
148    }
149
150    /// Indicates whether the underlying [`Mutex`] has been poisoned.
151    ///
152    /// This happens when a panic occurs while interacting with the object.
153    pub fn is_mutex_poisoned(&self) -> bool {
154        self.obj.is_poisoned()
155    }
156
157    /// Lock the underlying mutex and return a guard for the inner
158    /// object.
159    pub fn lock(&self) -> Result<SyncGuard<'_, T>, PoisonError<MutexGuard<'_, Option<T>>>> {
160        self.obj.lock().map(SyncGuard)
161    }
162
163    /// Try to lock the underlying mutex and return a guard for the
164    /// inner object.
165    pub fn try_lock(&self) -> Result<SyncGuard<'_, T>, TryLockError<MutexGuard<'_, Option<T>>>> {
166        self.obj.try_lock().map(SyncGuard)
167    }
168}
169
170impl<T, E> Drop for SyncWrapper<T, E>
171where
172    T: Send + 'static,
173    E: Send + 'static,
174{
175    fn drop(&mut self) {
176        let arc = self.obj.clone();
177        // Drop the `rusqlite::Connection` inside a `spawn_blocking`
178        // as the `drop` function of it can block.
179        self.runtime
180            .spawn_blocking_background(move || match arc.lock() {
181                Ok(mut guard) => drop(guard.take()),
182                Err(e) => drop(e.into_inner().take()),
183            })
184            .unwrap();
185    }
186}
187
188/// This guard is returned when calling `SyncWrapper::lock` or
189/// `SyncWrapper::try_lock`. This is basicly just a wrapper around
190/// a `MutexGuard` but hides some implementation details.
191///
192/// **Important:** Any blocking operation using this object
193/// should be executed on a separate thread (e.g. via `spawn_blocking`).
194#[derive(Debug)]
195pub struct SyncGuard<'a, T: Send>(MutexGuard<'a, Option<T>>);
196
197impl<'a, T: Send> Deref for SyncGuard<'a, T> {
198    type Target = T;
199    fn deref(&self) -> &Self::Target {
200        self.0.as_ref().unwrap()
201    }
202}
203
204impl<'a, T: Send> DerefMut for SyncGuard<'a, T> {
205    fn deref_mut(&mut self) -> &mut Self::Target {
206        self.0.as_mut().unwrap()
207    }
208}
209
210impl<'a, T: Send> AsRef<T> for SyncGuard<'a, T> {
211    fn as_ref(&self) -> &T {
212        self.0.as_ref().unwrap()
213    }
214}
215
216impl<'a, T: Send> AsMut<T> for SyncGuard<'a, T> {
217    fn as_mut(&mut self) -> &mut T {
218        self.0.as_mut().unwrap()
219    }
220}