duckdb/
transaction.rs

1use crate::{Connection, Result};
2use std::ops::Deref;
3
4/// Options for how a Transaction should behave when it is dropped.
5#[derive(Copy, Clone, Debug, PartialEq, Eq)]
6#[non_exhaustive]
7pub enum DropBehavior {
8    /// Roll back the changes. This is the default.
9    Rollback,
10
11    /// Commit the changes.
12    Commit,
13
14    /// Do not commit or roll back changes - this will leave the transaction open, so should be used with care.
15    Ignore,
16
17    /// Panic. Used to enforce intentional behavior during development.
18    Panic,
19}
20
21/// Represents a transaction on a database connection.
22///
23/// ## Note
24///
25/// Transactions will roll back by default. Use `commit` method to explicitly
26/// commit the transaction, or use `set_drop_behavior` to change what happens
27/// when the transaction is dropped.
28///
29/// ## Example
30///
31/// ```rust,no_run
32/// # use duckdb::{Connection, Result};
33/// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
34/// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
35/// fn perform_queries(conn: &mut Connection) -> Result<()> {
36///     let tx = conn.transaction()?;
37///
38///     do_queries_part_1(&tx)?; // tx causes rollback if this fails
39///     do_queries_part_2(&tx)?; // tx causes rollback if this fails
40///
41///     tx.commit()
42/// }
43/// ```
44#[derive(Debug)]
45pub struct Transaction<'conn> {
46    conn: &'conn Connection,
47    drop_behavior: DropBehavior,
48}
49
50impl Transaction<'_> {
51    /// Begin a new transaction. Cannot be nested.
52    ///
53    /// Even though we don't mutate the connection, we take a `&mut Connection`
54    /// so as to prevent nested transactions on the same connection. For cases
55    /// where this is unacceptable, [`Transaction::new_unchecked`] is available.
56    #[inline]
57    pub fn new(conn: &mut Connection) -> Result<Transaction<'_>> {
58        Self::new_unchecked(conn)
59    }
60
61    /// Begin a new transaction, failing if a transaction is open.
62    ///
63    /// If a transaction is already open, this will return an error. Where
64    /// possible, [`Transaction::new`] should be preferred, as it provides a
65    /// compile-time guarantee that transactions are not nested.
66    #[inline]
67    pub fn new_unchecked(conn: &Connection) -> Result<Transaction<'_>> {
68        let query = "BEGIN TRANSACTION";
69        conn.execute_batch(query).map(move |_| Transaction {
70            conn,
71            drop_behavior: DropBehavior::Rollback,
72        })
73    }
74
75    /// Get the current setting for what happens to the transaction when it is
76    /// dropped.
77    #[inline]
78    pub fn drop_behavior(&self) -> DropBehavior {
79        self.drop_behavior
80    }
81
82    /// Configure the transaction to perform the specified action when it is
83    /// dropped.
84    #[inline]
85    pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) {
86        self.drop_behavior = drop_behavior
87    }
88
89    /// A convenience method which consumes and commits a transaction.
90    #[inline]
91    pub fn commit(mut self) -> Result<()> {
92        self.commit_()
93    }
94
95    #[inline]
96    fn commit_(&mut self) -> Result<()> {
97        self.conn.execute_batch("COMMIT")
98    }
99
100    /// A convenience method which consumes and rolls back a transaction.
101    #[inline]
102    pub fn rollback(mut self) -> Result<()> {
103        self.rollback_()
104    }
105
106    #[inline]
107    fn rollback_(&mut self) -> Result<()> {
108        self.conn.execute_batch("ROLLBACK")
109    }
110
111    /// Consumes the transaction, committing or rolling back according to the
112    /// current setting (see `drop_behavior`).
113    ///
114    /// Functionally equivalent to the `Drop` implementation, but allows
115    /// callers to see any errors that occur.
116    #[inline]
117    pub fn finish(mut self) -> Result<()> {
118        self.finish_()
119    }
120
121    #[inline]
122    fn finish_(&mut self) -> Result<()> {
123        // if self.conn.is_autocommit() {
124        //     println!("is autocommit");
125        //     return Ok(());
126        // }
127        match self.drop_behavior() {
128            DropBehavior::Commit => self.commit_().or_else(|_| self.rollback_()),
129            DropBehavior::Rollback => self.rollback_(),
130            DropBehavior::Ignore => Ok(()),
131            DropBehavior::Panic => panic!("Transaction dropped unexpectedly."),
132        }
133    }
134}
135
136impl Deref for Transaction<'_> {
137    type Target = Connection;
138
139    #[inline]
140    fn deref(&self) -> &Connection {
141        self.conn
142    }
143}
144
145#[allow(unused_must_use)]
146impl Drop for Transaction<'_> {
147    #[inline]
148    fn drop(&mut self) {
149        self.finish_();
150    }
151}
152
153impl Connection {
154    /// Begin a new transaction with the default behavior (DEFERRED).
155    ///
156    /// The transaction defaults to rolling back when it is dropped. If you
157    /// want the transaction to commit, you must call
158    /// [`commit`](Transaction::commit) or [`set_drop_behavior(DropBehavior:
159    /// :Commit)`](Transaction::set_drop_behavior).
160    ///
161    /// ## Example
162    ///
163    /// ```rust,no_run
164    /// # use duckdb::{Connection, Result};
165    /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
166    /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
167    /// fn perform_queries(conn: &mut Connection) -> Result<()> {
168    ///     let tx = conn.transaction()?;
169    ///
170    ///     do_queries_part_1(&tx)?; // tx causes rollback if this fails
171    ///     do_queries_part_2(&tx)?; // tx causes rollback if this fails
172    ///
173    ///     tx.commit()
174    /// }
175    /// ```
176    ///
177    /// # Failure
178    ///
179    /// Will return `Err` if the underlying DuckDB call fails.
180    #[inline]
181    pub fn transaction(&mut self) -> Result<Transaction<'_>> {
182        Transaction::new(self)
183    }
184
185    /// Begin a new transaction with the default behavior (DEFERRED).
186    ///
187    /// Attempt to open a nested transaction will result in a DuckDB error.
188    /// `Connection::transaction` prevents this at compile time by taking `&mut
189    /// self`, but `Connection::unchecked_transaction()` may be used to defer
190    /// the checking until runtime.
191    ///
192    /// See [`Connection::transaction`] and [`Transaction::new_unchecked`]
193    /// (which can be used if the default transaction behavior is undesirable).
194    ///
195    /// ## Example
196    ///
197    /// ```rust,no_run
198    /// # use duckdb::{Connection, Result};
199    /// # use std::rc::Rc;
200    /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
201    /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
202    /// fn perform_queries(conn: Rc<Connection>) -> Result<()> {
203    ///     let tx = conn.unchecked_transaction()?;
204    ///
205    ///     do_queries_part_1(&tx)?; // tx causes rollback if this fails
206    ///     do_queries_part_2(&tx)?; // tx causes rollback if this fails
207    ///
208    ///     tx.commit()
209    /// }
210    /// ```
211    ///
212    /// # Failure
213    ///
214    /// Will return `Err` if the underlying DuckDB call fails. The specific
215    /// error returned if transactions are nested is currently unspecified.
216    pub fn unchecked_transaction(&self) -> Result<Transaction<'_>> {
217        Transaction::new_unchecked(self)
218    }
219}
220
221#[cfg(test)]
222mod test {
223    use super::DropBehavior;
224    use crate::{Connection, Result};
225
226    fn checked_no_autocommit_memory_handle() -> Result<Connection> {
227        let db = Connection::open_in_memory()?;
228        db.execute_batch(
229            r"
230            CREATE TABLE foo (x INTEGER);
231            -- SET AUTOCOMMIT TO OFF;
232        ",
233        )?;
234        Ok(db)
235    }
236
237    #[test]
238    fn test_drop() -> Result<()> {
239        let mut db = checked_no_autocommit_memory_handle()?;
240        {
241            let tx = db.transaction()?;
242            assert!(tx.execute_batch("INSERT INTO foo VALUES(1)").is_ok());
243            // default: rollback
244        }
245        {
246            let mut tx = db.transaction()?;
247            tx.execute_batch("INSERT INTO foo VALUES(2)")?;
248            tx.set_drop_behavior(DropBehavior::Commit);
249        }
250        {
251            let tx = db.transaction()?;
252            assert_eq!(
253                2i32,
254                tx.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
255            );
256        }
257        Ok(())
258    }
259
260    #[test]
261    fn test_unchecked_nesting() -> Result<()> {
262        let db = checked_no_autocommit_memory_handle()?;
263
264        {
265            db.unchecked_transaction()?;
266            // default: rollback
267        }
268        {
269            let tx = db.unchecked_transaction()?;
270            tx.execute_batch("INSERT INTO foo VALUES(1)")?;
271            // Ensure this doesn't interfere with ongoing transaction
272            // let e = tx.unchecked_transaction().unwrap_err();
273            // assert_nested_tx_error(e);
274            tx.execute_batch("INSERT INTO foo VALUES(1)")?;
275            tx.commit()?;
276        }
277
278        assert_eq!(
279            2i32,
280            db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
281        );
282        Ok(())
283    }
284
285    #[test]
286    fn test_explicit_rollback_commit() -> Result<()> {
287        let mut db = checked_no_autocommit_memory_handle()?;
288        {
289            let tx = db.transaction()?;
290            tx.execute_batch("INSERT INTO foo VALUES(1)")?;
291            tx.rollback()?;
292        }
293        {
294            let tx = db.transaction()?;
295            tx.execute_batch("INSERT INTO foo VALUES(4)")?;
296            tx.commit()?;
297        }
298        {
299            let tx = db.transaction()?;
300            assert_eq!(
301                4i32,
302                tx.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
303            );
304        }
305        Ok(())
306    }
307
308    #[test]
309    fn test_rc() -> Result<()> {
310        use std::rc::Rc;
311        let mut conn = Connection::open_in_memory()?;
312        let rc_txn = Rc::new(conn.transaction()?);
313
314        // This will compile only if Transaction is Debug
315        Rc::try_unwrap(rc_txn).unwrap();
316        Ok(())
317    }
318}