deadpool/managed/
sync.rs
1pub mod reexports {
5 pub use super::super::reexports::*;
25 pub use super::{InteractError, SyncGuard};
26}
27
28use std::{
29 any::Any,
30 fmt,
31 marker::PhantomData,
32 ops::{Deref, DerefMut},
33 sync::{Arc, Mutex, MutexGuard, PoisonError, TryLockError},
34};
35
36use crate::{Runtime, SpawnBlockingError};
37
38#[derive(Debug)]
40pub enum InteractError<E> {
41 Panic(Box<dyn Any + Send + 'static>),
43
44 Aborted,
46
47 Backend(E),
49}
50
51impl<E: fmt::Display> fmt::Display for InteractError<E> {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 match self {
54 Self::Panic(_) => write!(f, "Panic"),
55 Self::Aborted => write!(f, "Aborted"),
56 Self::Backend(e) => write!(f, "Backend error: {}", e),
57 }
58 }
59}
60
61impl<E: std::error::Error + 'static> std::error::Error for InteractError<E> {
62 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
63 match self {
64 Self::Panic(_) | Self::Aborted => None,
65 Self::Backend(e) => Some(e),
66 }
67 }
68}
69
70#[must_use]
76pub struct SyncWrapper<T, E>
77where
78 T: Send + 'static,
79 E: Send + 'static,
80{
81 obj: Arc<Mutex<Option<T>>>,
82 runtime: Runtime,
83 _error: PhantomData<fn() -> E>,
84}
85
86impl<T, E> fmt::Debug for SyncWrapper<T, E>
88where
89 T: fmt::Debug + Send + 'static,
90 E: Send + 'static,
91{
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("SyncWrapper")
94 .field("obj", &self.obj)
95 .field("runtime", &self.runtime)
96 .field("_error", &self._error)
97 .finish()
98 }
99}
100
101impl<T, E> SyncWrapper<T, E>
102where
103 T: Send + 'static,
104 E: Send + 'static,
105{
106 pub async fn new<F>(runtime: Runtime, f: F) -> Result<Self, E>
108 where
109 F: FnOnce() -> Result<T, E> + Send + 'static,
110 {
111 let result = match runtime.spawn_blocking(f).await {
112 Err(SpawnBlockingError::Panic(e)) => panic!("{:?}", e),
117 Ok(obj) => obj,
118 };
119 result.map(|obj| Self {
120 obj: Arc::new(Mutex::new(Some(obj))),
121 runtime,
122 _error: PhantomData::default(),
123 })
124 }
125
126 pub async fn interact<F, R>(&self, f: F) -> Result<R, InteractError<E>>
132 where
133 F: FnOnce(&mut T) -> Result<R, E> + Send + 'static,
134 R: Send + 'static,
135 {
136 let arc = self.obj.clone();
137 self.runtime
138 .spawn_blocking(move || {
139 let mut guard = arc.lock().unwrap();
140 let conn = guard.as_mut().unwrap();
141 f(conn)
142 })
143 .await
144 .map_err(|e| match e {
145 SpawnBlockingError::Panic(p) => InteractError::Panic(p),
146 })?
147 .map_err(InteractError::Backend)
148 }
149
150 pub fn is_mutex_poisoned(&self) -> bool {
154 self.obj.is_poisoned()
155 }
156
157 pub fn lock(&self) -> Result<SyncGuard<'_, T>, PoisonError<MutexGuard<'_, Option<T>>>> {
160 self.obj.lock().map(SyncGuard)
161 }
162
163 pub fn try_lock(&self) -> Result<SyncGuard<'_, T>, TryLockError<MutexGuard<'_, Option<T>>>> {
166 self.obj.try_lock().map(SyncGuard)
167 }
168}
169
170impl<T, E> Drop for SyncWrapper<T, E>
171where
172 T: Send + 'static,
173 E: Send + 'static,
174{
175 fn drop(&mut self) {
176 let arc = self.obj.clone();
177 self.runtime
180 .spawn_blocking_background(move || match arc.lock() {
181 Ok(mut guard) => drop(guard.take()),
182 Err(e) => drop(e.into_inner().take()),
183 })
184 .unwrap();
185 }
186}
187
188#[derive(Debug)]
195pub struct SyncGuard<'a, T: Send>(MutexGuard<'a, Option<T>>);
196
197impl<'a, T: Send> Deref for SyncGuard<'a, T> {
198 type Target = T;
199 fn deref(&self) -> &Self::Target {
200 self.0.as_ref().unwrap()
201 }
202}
203
204impl<'a, T: Send> DerefMut for SyncGuard<'a, T> {
205 fn deref_mut(&mut self) -> &mut Self::Target {
206 self.0.as_mut().unwrap()
207 }
208}
209
210impl<'a, T: Send> AsRef<T> for SyncGuard<'a, T> {
211 fn as_ref(&self) -> &T {
212 self.0.as_ref().unwrap()
213 }
214}
215
216impl<'a, T: Send> AsMut<T> for SyncGuard<'a, T> {
217 fn as_mut(&mut self) -> &mut T {
218 self.0.as_mut().unwrap()
219 }
220}