use std::{fmt, ops::Deref};
use crate::{connection_like::Connection, error::*, queryable::Queryable, Conn};
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[repr(u8)]
pub enum TxStatus {
InTransaction,
RequiresRollback,
None,
}
impl Conn {
pub async fn start_transaction(&mut self, options: TxOpts) -> Result<Transaction<'_>> {
Transaction::new(self, options).await
}
}
#[derive(Eq, PartialEq, Debug, Hash, Clone, Default)]
pub struct TxOpts {
consistent_snapshot: bool,
isolation_level: Option<IsolationLevel>,
readonly: Option<bool>,
}
impl TxOpts {
pub fn new() -> TxOpts {
TxOpts::default()
}
pub fn with_consistent_snapshot(&mut self, value: bool) -> &mut Self {
self.consistent_snapshot = value;
self
}
pub fn with_isolation_level<T>(&mut self, value: T) -> &mut Self
where
T: Into<Option<IsolationLevel>>,
{
self.isolation_level = value.into();
self
}
pub fn with_readonly<T>(&mut self, value: T) -> &mut Self
where
T: Into<Option<bool>>,
{
self.readonly = value.into();
self
}
pub fn consistent_snapshot(&self) -> bool {
self.consistent_snapshot
}
pub fn isolation_level(&self) -> Option<IsolationLevel> {
self.isolation_level
}
pub fn readonly(&self) -> Option<bool> {
self.readonly
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}
impl fmt::Display for IsolationLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
IsolationLevel::ReadUncommitted => write!(f, "READ UNCOMMITTED"),
IsolationLevel::ReadCommitted => write!(f, "READ COMMITTED"),
IsolationLevel::RepeatableRead => write!(f, "REPEATABLE READ"),
IsolationLevel::Serializable => write!(f, "SERIALIZABLE"),
}
}
}
#[must_use = "transaction object must be committed or rolled back explicitly"]
#[derive(Debug)]
pub struct Transaction<'a>(pub(crate) Connection<'a, 'static>);
impl<'a> Transaction<'a> {
pub(crate) async fn new<T: Into<Connection<'a, 'static>>>(
conn: T,
options: TxOpts,
) -> Result<Transaction<'a>> {
let TxOpts {
consistent_snapshot,
isolation_level,
readonly,
} = options;
let mut conn = conn.into();
if conn.get_tx_status() != TxStatus::None {
return Err(DriverError::NestedTransaction.into());
}
if readonly.is_some() && conn.server_version() < (5, 6, 5) {
return Err(DriverError::ReadOnlyTransNotSupported.into());
}
if let Some(isolation_level) = isolation_level {
let query = format!("SET TRANSACTION ISOLATION LEVEL {}", isolation_level);
conn.query_drop(query).await?;
}
if let Some(readonly) = readonly {
if readonly {
conn.query_drop("SET TRANSACTION READ ONLY").await?;
} else {
conn.query_drop("SET TRANSACTION READ WRITE").await?;
}
}
if consistent_snapshot {
conn.query_drop("START TRANSACTION WITH CONSISTENT SNAPSHOT")
.await?
} else {
conn.query_drop("START TRANSACTION").await?
};
conn.set_tx_status(TxStatus::InTransaction);
Ok(Transaction(conn))
}
pub async fn commit(mut self) -> Result<()> {
let result = self.0.query_iter("COMMIT").await?;
result.drop_result().await?;
self.0.set_tx_status(TxStatus::None);
Ok(())
}
pub async fn rollback(mut self) -> Result<()> {
let result = self.0.query_iter("ROLLBACK").await?;
result.drop_result().await?;
self.0.set_tx_status(TxStatus::None);
Ok(())
}
}
impl Deref for Transaction<'_> {
type Target = Conn;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
if self.0.get_tx_status() == TxStatus::InTransaction {
self.0.set_tx_status(TxStatus::RequiresRollback);
}
}
}