use crate::connection::ConnectionRef;
use crate::{CancelToken, CopyInWriter, CopyOutReader, Portal, RowIter, Statement, ToStatement};
use tokio_postgres::types::{BorrowToSql, ToSql, Type};
use tokio_postgres::{Error, Row, SimpleQueryMessage};
pub struct Transaction<'a> {
connection: ConnectionRef<'a>,
transaction: Option<tokio_postgres::Transaction<'a>>,
}
impl<'a> Drop for Transaction<'a> {
fn drop(&mut self) {
if let Some(transaction) = self.transaction.take() {
let _ = self.connection.block_on(transaction.rollback());
}
}
}
impl<'a> Transaction<'a> {
pub(crate) fn new(
connection: ConnectionRef<'a>,
transaction: tokio_postgres::Transaction<'a>,
) -> Transaction<'a> {
Transaction {
connection,
transaction: Some(transaction),
}
}
pub fn commit(mut self) -> Result<(), Error> {
self.connection
.block_on(self.transaction.take().unwrap().commit())
}
pub fn rollback(mut self) -> Result<(), Error> {
self.connection
.block_on(self.transaction.take().unwrap().rollback())
}
pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
self.connection
.block_on(self.transaction.as_ref().unwrap().prepare(query))
}
pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
self.connection.block_on(
self.transaction
.as_ref()
.unwrap()
.prepare_typed(query, types),
)
}
pub fn execute<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error>
where
T: ?Sized + ToStatement,
{
self.connection
.block_on(self.transaction.as_ref().unwrap().execute(query, params))
}
pub fn query<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error>
where
T: ?Sized + ToStatement,
{
self.connection
.block_on(self.transaction.as_ref().unwrap().query(query, params))
}
pub fn query_one<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Row, Error>
where
T: ?Sized + ToStatement,
{
self.connection
.block_on(self.transaction.as_ref().unwrap().query_one(query, params))
}
pub fn query_opt<T>(
&mut self,
query: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<Option<Row>, Error>
where
T: ?Sized + ToStatement,
{
self.connection
.block_on(self.transaction.as_ref().unwrap().query_opt(query, params))
}
pub fn query_raw<T, P, I>(&mut self, query: &T, params: I) -> Result<RowIter<'_>, Error>
where
T: ?Sized + ToStatement,
P: BorrowToSql,
I: IntoIterator<Item = P>,
I::IntoIter: ExactSizeIterator,
{
let stream = self
.connection
.block_on(self.transaction.as_ref().unwrap().query_raw(query, params))?;
Ok(RowIter::new(self.connection.as_ref(), stream))
}
pub fn query_typed(
&mut self,
statement: &str,
params: &[(&(dyn ToSql + Sync), Type)],
) -> Result<Vec<Row>, Error> {
self.connection.block_on(
self.transaction
.as_ref()
.unwrap()
.query_typed(statement, params),
)
}
pub fn query_typed_raw<P, I>(&mut self, query: &str, params: I) -> Result<RowIter<'_>, Error>
where
P: BorrowToSql,
I: IntoIterator<Item = (P, Type)>,
{
let stream = self.connection.block_on(
self.transaction
.as_ref()
.unwrap()
.query_typed_raw(query, params),
)?;
Ok(RowIter::new(self.connection.as_ref(), stream))
}
pub fn bind<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Portal, Error>
where
T: ?Sized + ToStatement,
{
self.connection
.block_on(self.transaction.as_ref().unwrap().bind(query, params))
}
pub fn query_portal(&mut self, portal: &Portal, max_rows: i32) -> Result<Vec<Row>, Error> {
self.connection.block_on(
self.transaction
.as_ref()
.unwrap()
.query_portal(portal, max_rows),
)
}
pub fn query_portal_raw(
&mut self,
portal: &Portal,
max_rows: i32,
) -> Result<RowIter<'_>, Error> {
let stream = self.connection.block_on(
self.transaction
.as_ref()
.unwrap()
.query_portal_raw(portal, max_rows),
)?;
Ok(RowIter::new(self.connection.as_ref(), stream))
}
pub fn copy_in<T>(&mut self, query: &T) -> Result<CopyInWriter<'_>, Error>
where
T: ?Sized + ToStatement,
{
let sink = self
.connection
.block_on(self.transaction.as_ref().unwrap().copy_in(query))?;
Ok(CopyInWriter::new(self.connection.as_ref(), sink))
}
pub fn copy_out<T>(&mut self, query: &T) -> Result<CopyOutReader<'_>, Error>
where
T: ?Sized + ToStatement,
{
let stream = self
.connection
.block_on(self.transaction.as_ref().unwrap().copy_out(query))?;
Ok(CopyOutReader::new(self.connection.as_ref(), stream))
}
pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
self.connection
.block_on(self.transaction.as_ref().unwrap().simple_query(query))
}
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
self.connection
.block_on(self.transaction.as_ref().unwrap().batch_execute(query))
}
pub fn cancel_token(&self) -> CancelToken {
CancelToken::new(self.transaction.as_ref().unwrap().cancel_token())
}
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
let transaction = self
.connection
.block_on(self.transaction.as_mut().unwrap().transaction())?;
Ok(Transaction::new(self.connection.as_ref(), transaction))
}
pub fn savepoint<I>(&mut self, name: I) -> Result<Transaction<'_>, Error>
where
I: Into<String>,
{
let transaction = self
.connection
.block_on(self.transaction.as_mut().unwrap().savepoint(name))?;
Ok(Transaction::new(self.connection.as_ref(), transaction))
}
}