tokio_postgres/
transaction_builder.rs

1use postgres_protocol::message::frontend;
2
3use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction};
4
5/// The isolation level of a database transaction.
6#[derive(Debug, Copy, Clone)]
7#[non_exhaustive]
8pub enum IsolationLevel {
9    /// Equivalent to `ReadCommitted`.
10    ReadUncommitted,
11
12    /// An individual statement in the transaction will see rows committed before it began.
13    ReadCommitted,
14
15    /// All statements in the transaction will see the same view of rows committed before the first query in the
16    /// transaction.
17    RepeatableRead,
18
19    /// The reads and writes in this transaction must be able to be committed as an atomic "unit" with respect to reads
20    /// and writes of all other concurrent serializable transactions without interleaving.
21    Serializable,
22}
23
24/// A builder for database transactions.
25pub struct TransactionBuilder<'a> {
26    client: &'a mut Client,
27    isolation_level: Option<IsolationLevel>,
28    read_only: Option<bool>,
29    deferrable: Option<bool>,
30}
31
32impl<'a> TransactionBuilder<'a> {
33    pub(crate) fn new(client: &'a mut Client) -> TransactionBuilder<'a> {
34        TransactionBuilder {
35            client,
36            isolation_level: None,
37            read_only: None,
38            deferrable: None,
39        }
40    }
41
42    /// Sets the isolation level of the transaction.
43    pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
44        self.isolation_level = Some(isolation_level);
45        self
46    }
47
48    /// Sets the access mode of the transaction.
49    pub fn read_only(mut self, read_only: bool) -> Self {
50        self.read_only = Some(read_only);
51        self
52    }
53
54    /// Sets the deferrability of the transaction.
55    ///
56    /// If the transaction is also serializable and read only, creation of the transaction may block, but when it
57    /// completes the transaction is able to run with less overhead and a guarantee that it will not be aborted due to
58    /// serialization failure.
59    pub fn deferrable(mut self, deferrable: bool) -> Self {
60        self.deferrable = Some(deferrable);
61        self
62    }
63
64    /// Begins the transaction.
65    ///
66    /// The transaction will roll back by default - use the `commit` method to commit it.
67    pub async fn start(self) -> Result<Transaction<'a>, Error> {
68        let mut query = "START TRANSACTION".to_string();
69        let mut first = true;
70
71        if let Some(level) = self.isolation_level {
72            first = false;
73
74            query.push_str(" ISOLATION LEVEL ");
75            let level = match level {
76                IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
77                IsolationLevel::ReadCommitted => "READ COMMITTED",
78                IsolationLevel::RepeatableRead => "REPEATABLE READ",
79                IsolationLevel::Serializable => "SERIALIZABLE",
80            };
81            query.push_str(level);
82        }
83
84        if let Some(read_only) = self.read_only {
85            if !first {
86                query.push(',');
87            }
88            first = false;
89
90            let s = if read_only {
91                " READ ONLY"
92            } else {
93                " READ WRITE"
94            };
95            query.push_str(s);
96        }
97
98        if let Some(deferrable) = self.deferrable {
99            if !first {
100                query.push(',');
101            }
102
103            let s = if deferrable {
104                " DEFERRABLE"
105            } else {
106                " NOT DEFERRABLE"
107            };
108            query.push_str(s);
109        }
110
111        struct RollbackIfNotDone<'me> {
112            client: &'me Client,
113            done: bool,
114        }
115
116        impl<'a> Drop for RollbackIfNotDone<'a> {
117            fn drop(&mut self) {
118                if self.done {
119                    return;
120                }
121
122                let buf = self.client.inner().with_buf(|buf| {
123                    frontend::query("ROLLBACK", buf).unwrap();
124                    buf.split().freeze()
125                });
126                let _ = self
127                    .client
128                    .inner()
129                    .send(RequestMessages::Single(FrontendMessage::Raw(buf)));
130            }
131        }
132
133        // This is done as `Future` created by this method can be dropped after
134        // `RequestMessages` is synchronously send to the `Connection` by
135        // `batch_execute()`, but before `Responses` is asynchronously polled to
136        // completion. In that case `Transaction` won't be created and thus
137        // won't be rolled back.
138        {
139            let mut cleaner = RollbackIfNotDone {
140                client: self.client,
141                done: false,
142            };
143            self.client.batch_execute(&query).await?;
144            cleaner.done = true;
145        }
146
147        Ok(Transaction::new(self.client))
148    }
149}