mysql_async/queryable/
transaction.rs
1use std::{fmt, ops::Deref};
10
11use crate::{connection_like::Connection, error::*, queryable::Queryable, Conn};
12
13#[derive(Debug, Copy, Clone, Eq, PartialEq)]
15#[repr(u8)]
16pub enum TxStatus {
17 InTransaction,
19 RequiresRollback,
21 None,
23}
24
25impl Conn {
26 pub async fn start_transaction(&mut self, options: TxOpts) -> Result<Transaction<'_>> {
28 Transaction::new(self, options).await
29 }
30}
31
32#[derive(Eq, PartialEq, Debug, Hash, Clone, Default)]
45pub struct TxOpts {
46 consistent_snapshot: bool,
47 isolation_level: Option<IsolationLevel>,
48 readonly: Option<bool>,
49}
50
51impl TxOpts {
52 pub fn new() -> TxOpts {
54 TxOpts::default()
55 }
56
57 pub fn with_consistent_snapshot(&mut self, value: bool) -> &mut Self {
59 self.consistent_snapshot = value;
60 self
61 }
62
63 pub fn with_isolation_level<T>(&mut self, value: T) -> &mut Self
65 where
66 T: Into<Option<IsolationLevel>>,
67 {
68 self.isolation_level = value.into();
69 self
70 }
71
72 pub fn with_readonly<T>(&mut self, value: T) -> &mut Self
74 where
75 T: Into<Option<bool>>,
76 {
77 self.readonly = value.into();
78 self
79 }
80
81 pub fn consistent_snapshot(&self) -> bool {
84 self.consistent_snapshot
85 }
86
87 pub fn isolation_level(&self) -> Option<IsolationLevel> {
90 self.isolation_level
91 }
92
93 pub fn readonly(&self) -> Option<bool> {
98 self.readonly
99 }
100}
101
102#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)]
104pub enum IsolationLevel {
105 ReadUncommitted,
106 ReadCommitted,
107 RepeatableRead,
108 Serializable,
109}
110
111impl fmt::Display for IsolationLevel {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 match *self {
114 IsolationLevel::ReadUncommitted => write!(f, "READ UNCOMMITTED"),
115 IsolationLevel::ReadCommitted => write!(f, "READ COMMITTED"),
116 IsolationLevel::RepeatableRead => write!(f, "REPEATABLE READ"),
117 IsolationLevel::Serializable => write!(f, "SERIALIZABLE"),
118 }
119 }
120}
121
122#[must_use = "transaction object must be committed or rolled back explicitly"]
130#[derive(Debug)]
131pub struct Transaction<'a>(pub(crate) Connection<'a, 'static>);
132
133impl<'a> Transaction<'a> {
134 pub(crate) async fn new<T: Into<Connection<'a, 'static>>>(
135 conn: T,
136 options: TxOpts,
137 ) -> Result<Transaction<'a>> {
138 let TxOpts {
139 consistent_snapshot,
140 isolation_level,
141 readonly,
142 } = options;
143
144 let mut conn = conn.into();
145
146 if conn.get_tx_status() != TxStatus::None {
147 return Err(DriverError::NestedTransaction.into());
148 }
149
150 if readonly.is_some() && conn.server_version() < (5, 6, 5) {
151 return Err(DriverError::ReadOnlyTransNotSupported.into());
152 }
153
154 if let Some(isolation_level) = isolation_level {
155 let query = format!("SET TRANSACTION ISOLATION LEVEL {}", isolation_level);
156 conn.query_drop(query).await?;
157 }
158
159 if let Some(readonly) = readonly {
160 if readonly {
161 conn.query_drop("SET TRANSACTION READ ONLY").await?;
162 } else {
163 conn.query_drop("SET TRANSACTION READ WRITE").await?;
164 }
165 }
166
167 if consistent_snapshot {
168 conn.query_drop("START TRANSACTION WITH CONSISTENT SNAPSHOT")
169 .await?
170 } else {
171 conn.query_drop("START TRANSACTION").await?
172 };
173
174 conn.set_tx_status(TxStatus::InTransaction);
175 Ok(Transaction(conn))
176 }
177
178 pub async fn commit(mut self) -> Result<()> {
180 let result = self.0.query_iter("COMMIT").await?;
181 result.drop_result().await?;
182 self.0.set_tx_status(TxStatus::None);
183 Ok(())
184 }
185
186 pub async fn rollback(mut self) -> Result<()> {
188 let result = self.0.query_iter("ROLLBACK").await?;
189 result.drop_result().await?;
190 self.0.set_tx_status(TxStatus::None);
191 Ok(())
192 }
193}
194
195impl Deref for Transaction<'_> {
196 type Target = Conn;
197
198 fn deref(&self) -> &Self::Target {
199 &self.0
200 }
201}
202
203impl Drop for Transaction<'_> {
204 fn drop(&mut self) {
205 if self.0.get_tx_status() == TxStatus::InTransaction {
206 self.0.set_tx_status(TxStatus::RequiresRollback);
207 }
208 }
209}