mysql_async/queryable/
transaction.rs1use std::{fmt, ops::Deref};
10
11use crate::{connection_like::Connection, error::*, queryable::Queryable, Conn};
12
13#[derive(Debug, Copy, Clone, Eq, PartialEq)]
15#[repr(u8)]
16pub enum TxStatus {
17    InTransaction,
19    RequiresRollback,
21    None,
23}
24
25impl Conn {
26    pub async fn start_transaction(&mut self, options: TxOpts) -> Result<Transaction<'_>> {
28        Transaction::new(self, options).await
29    }
30}
31
32#[derive(Eq, PartialEq, Debug, Hash, Clone, Default)]
45pub struct TxOpts {
46    consistent_snapshot: bool,
47    isolation_level: Option<IsolationLevel>,
48    readonly: Option<bool>,
49}
50
51impl TxOpts {
52    pub fn new() -> TxOpts {
54        TxOpts::default()
55    }
56
57    pub fn with_consistent_snapshot(&mut self, value: bool) -> &mut Self {
59        self.consistent_snapshot = value;
60        self
61    }
62
63    pub fn with_isolation_level<T>(&mut self, value: T) -> &mut Self
65    where
66        T: Into<Option<IsolationLevel>>,
67    {
68        self.isolation_level = value.into();
69        self
70    }
71
72    pub fn with_readonly<T>(&mut self, value: T) -> &mut Self
74    where
75        T: Into<Option<bool>>,
76    {
77        self.readonly = value.into();
78        self
79    }
80
81    pub fn consistent_snapshot(&self) -> bool {
84        self.consistent_snapshot
85    }
86
87    pub fn isolation_level(&self) -> Option<IsolationLevel> {
90        self.isolation_level
91    }
92
93    pub fn readonly(&self) -> Option<bool> {
98        self.readonly
99    }
100}
101
102#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)]
104pub enum IsolationLevel {
105    ReadUncommitted,
106    ReadCommitted,
107    RepeatableRead,
108    Serializable,
109}
110
111impl fmt::Display for IsolationLevel {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        match *self {
114            IsolationLevel::ReadUncommitted => write!(f, "READ UNCOMMITTED"),
115            IsolationLevel::ReadCommitted => write!(f, "READ COMMITTED"),
116            IsolationLevel::RepeatableRead => write!(f, "REPEATABLE READ"),
117            IsolationLevel::Serializable => write!(f, "SERIALIZABLE"),
118        }
119    }
120}
121
122#[must_use = "transaction object must be committed or rolled back explicitly"]
130#[derive(Debug)]
131pub struct Transaction<'a>(pub(crate) Connection<'a, 'static>);
132
133impl<'a> Transaction<'a> {
134    pub(crate) async fn new<T: Into<Connection<'a, 'static>>>(
135        conn: T,
136        options: TxOpts,
137    ) -> Result<Transaction<'a>> {
138        let TxOpts {
139            consistent_snapshot,
140            isolation_level,
141            readonly,
142        } = options;
143
144        let mut conn = conn.into();
145
146        conn.as_mut().clean_dirty().await?;
147
148        if conn.get_tx_status() != TxStatus::None {
149            return Err(DriverError::NestedTransaction.into());
150        }
151
152        if readonly.is_some() && conn.server_version() < (5, 6, 5) {
153            return Err(DriverError::ReadOnlyTransNotSupported.into());
154        }
155
156        if let Some(isolation_level) = isolation_level {
157            let query = format!("SET TRANSACTION ISOLATION LEVEL {}", isolation_level);
158            conn.as_mut().query_drop(query).await?;
159        }
160
161        if let Some(readonly) = readonly {
162            if readonly {
163                conn.as_mut()
164                    .query_drop("SET TRANSACTION READ ONLY")
165                    .await?;
166            } else {
167                conn.as_mut()
168                    .query_drop("SET TRANSACTION READ WRITE")
169                    .await?;
170            }
171        }
172
173        if consistent_snapshot {
174            conn.as_mut()
175                .query_drop("START TRANSACTION WITH CONSISTENT SNAPSHOT")
176                .await?
177        } else {
178            conn.as_mut().query_drop("START TRANSACTION").await?
179        };
180
181        conn.as_mut().set_tx_status(TxStatus::InTransaction);
182        Ok(Transaction(conn))
183    }
184
185    async fn try_commit(&mut self) -> Result<()> {
187        let result = self.0.as_mut().query_iter("COMMIT").await?;
188        result.drop_result().await?;
189        self.0.as_mut().set_tx_status(TxStatus::None);
190        Ok(())
191    }
192
193    pub async fn commit(mut self) -> Result<()> {
195        match self.try_commit().await {
196            Ok(..) => Ok(()),
197            Err(e) => {
198                self.0.as_mut().rollback_transaction().await.unwrap_or(());
199                Err(e)
200            }
201        }
202    }
203
204    pub async fn rollback(mut self) -> Result<()> {
206        self.0.as_mut().rollback_transaction().await
207    }
208}
209
210impl Deref for Transaction<'_> {
211    type Target = Conn;
212
213    fn deref(&self) -> &Self::Target {
214        &self.0
215    }
216}
217
218impl Drop for Transaction<'_> {
219    fn drop(&mut self) {
220        if self.0.get_tx_status() == TxStatus::InTransaction {
221            self.0.as_mut().set_tx_status(TxStatus::RequiresRollback);
222        }
223    }
224}