duckdb/
inner_connection.rs

1use std::{
2    ffi::{c_void, CStr, CString},
3    mem,
4    os::raw::c_char,
5    ptr, str,
6    sync::{Arc, Mutex},
7};
8
9use super::{ffi, Appender, Config, Connection, Result};
10use crate::{
11    error::{
12        result_from_duckdb_appender, result_from_duckdb_arrow, result_from_duckdb_extract, result_from_duckdb_prepare,
13        Error,
14    },
15    raw_statement::RawStatement,
16    statement::Statement,
17};
18
19pub struct InnerConnection {
20    pub db: ffi::duckdb_database,
21    pub con: ffi::duckdb_connection,
22    interrupt: Arc<InterruptHandle>,
23    owned: bool,
24}
25
26impl InnerConnection {
27    #[inline]
28    pub unsafe fn new(db: ffi::duckdb_database, owned: bool) -> Result<Self> {
29        let mut con: ffi::duckdb_connection = ptr::null_mut();
30        let r = ffi::duckdb_connect(db, &mut con);
31        if r != ffi::DuckDBSuccess {
32            ffi::duckdb_disconnect(&mut con);
33            return Err(Error::DuckDBFailure(
34                ffi::Error::new(r),
35                Some("connect error".to_owned()),
36            ));
37        }
38        let interrupt = Arc::new(InterruptHandle::new(con));
39
40        Ok(Self {
41            db,
42            con,
43            interrupt,
44            owned,
45        })
46    }
47
48    pub fn open_with_flags(c_path: &CStr, config: Config) -> Result<Self> {
49        unsafe {
50            let mut db: ffi::duckdb_database = ptr::null_mut();
51            let mut c_err = std::ptr::null_mut();
52            let r = ffi::duckdb_open_ext(c_path.as_ptr(), &mut db, config.duckdb_config(), &mut c_err);
53            if r != ffi::DuckDBSuccess {
54                let msg = Some(CStr::from_ptr(c_err).to_string_lossy().to_string());
55                ffi::duckdb_free(c_err as *mut c_void);
56                return Err(Error::DuckDBFailure(ffi::Error::new(r), msg));
57            }
58            Self::new(db, true)
59        }
60    }
61
62    pub fn close(&mut self) -> Result<()> {
63        if self.db.is_null() {
64            return Ok(());
65        }
66        if self.con.is_null() {
67            return Ok(());
68        }
69        unsafe {
70            ffi::duckdb_disconnect(&mut self.con);
71            self.con = ptr::null_mut();
72            self.interrupt.clear();
73
74            if self.owned {
75                ffi::duckdb_close(&mut self.db);
76                self.db = ptr::null_mut();
77            }
78        }
79        Ok(())
80    }
81
82    /// Creates a new connection to the already-opened database.
83    pub fn try_clone(&self) -> Result<Self> {
84        unsafe { Self::new(self.db, false) }
85    }
86
87    pub fn execute(&mut self, sql: &str) -> Result<()> {
88        let c_str = CString::new(sql)?;
89        unsafe {
90            let mut out = mem::zeroed();
91            let r = ffi::duckdb_query_arrow(self.con, c_str.as_ptr() as *const c_char, &mut out);
92            result_from_duckdb_arrow(r, out)?;
93            ffi::duckdb_destroy_arrow(&mut out);
94            Ok(())
95        }
96    }
97
98    pub fn prepare<'a>(&mut self, conn: &'a Connection, sql: &str) -> Result<Statement<'a>> {
99        let c_str = CString::new(sql)?;
100
101        // Extract statements (handles both single and multi-statement queries)
102        let mut extracted = ptr::null_mut();
103        let num_stmts =
104            unsafe { ffi::duckdb_extract_statements(self.con, c_str.as_ptr() as *const c_char, &mut extracted) };
105        result_from_duckdb_extract(num_stmts, extracted)?;
106
107        // Auto-cleanup on drop
108        let _guard = ExtractedStatementsGuard(extracted);
109
110        // Execute all intermediate statements
111        for i in 0..num_stmts - 1 {
112            self.execute_extracted_statement(extracted, i)?;
113        }
114
115        // Prepare and return final statement
116        let final_stmt = self.prepare_extracted_statement(extracted, num_stmts - 1)?;
117        Ok(Statement::new(conn, unsafe { RawStatement::new(final_stmt) }))
118    }
119
120    fn prepare_extracted_statement(
121        &self,
122        extracted: ffi::duckdb_extracted_statements,
123        index: ffi::idx_t,
124    ) -> Result<ffi::duckdb_prepared_statement> {
125        let mut stmt = ptr::null_mut();
126        let res = unsafe { ffi::duckdb_prepare_extracted_statement(self.con, extracted, index, &mut stmt) };
127        result_from_duckdb_prepare(res, stmt)?;
128        Ok(stmt)
129    }
130
131    fn execute_extracted_statement(
132        &self,
133        extracted: ffi::duckdb_extracted_statements,
134        index: ffi::idx_t,
135    ) -> Result<()> {
136        let mut stmt = self.prepare_extracted_statement(extracted, index)?;
137
138        let mut result = unsafe { mem::zeroed() };
139        let rc = unsafe { ffi::duckdb_execute_prepared(stmt, &mut result) };
140
141        let error = if rc != ffi::DuckDBSuccess {
142            unsafe {
143                let c_err = ffi::duckdb_result_error(&mut result as *mut _);
144                let msg = if c_err.is_null() {
145                    None
146                } else {
147                    Some(CStr::from_ptr(c_err).to_string_lossy().to_string())
148                };
149                Some(Error::DuckDBFailure(ffi::Error::new(rc), msg))
150            }
151        } else {
152            None
153        };
154
155        unsafe {
156            ffi::duckdb_destroy_prepare(&mut stmt);
157            ffi::duckdb_destroy_result(&mut result);
158        }
159
160        error.map_or(Ok(()), Err)
161    }
162
163    pub fn appender<'a>(&mut self, conn: &'a Connection, table: &str, schema: &str) -> Result<Appender<'a>> {
164        let mut c_app: ffi::duckdb_appender = ptr::null_mut();
165        let c_table = CString::new(table)?;
166        let c_schema = CString::new(schema)?;
167        let r = unsafe {
168            ffi::duckdb_appender_create(
169                self.con,
170                c_schema.as_ptr() as *const c_char,
171                c_table.as_ptr() as *const c_char,
172                &mut c_app,
173            )
174        };
175        result_from_duckdb_appender(r, &mut c_app)?;
176        Ok(Appender::new(conn, c_app))
177    }
178
179    pub fn appender_to_catalog_and_db<'a>(
180        &mut self,
181        conn: &'a Connection,
182        table: &str,
183        catalog: &str,
184        schema: &str,
185    ) -> Result<Appender<'a>> {
186        let mut c_app: ffi::duckdb_appender = ptr::null_mut();
187        let c_table = CString::new(table)?;
188        let c_catalog = CString::new(catalog)?;
189        let c_schema = CString::new(schema)?;
190
191        let r = unsafe {
192            ffi::duckdb_appender_create_ext(
193                self.con,
194                c_catalog.as_ptr() as *const c_char,
195                c_schema.as_ptr() as *const c_char,
196                c_table.as_ptr() as *const c_char,
197                &mut c_app,
198            )
199        };
200        result_from_duckdb_appender(r, &mut c_app)?;
201        Ok(Appender::new(conn, c_app))
202    }
203
204    pub fn appender_with_columns<'a>(
205        &mut self,
206        conn: &'a Connection,
207        table: &str,
208        schema: &str,
209        catalog: Option<&str>,
210        columns: &[&str],
211    ) -> Result<Appender<'a>> {
212        // The C API only supports narrowing columns after the appender is created.
213        // Create the appender first, then activate the requested column subset.
214        let mut appender = match catalog {
215            Some(catalog) => self.appender_to_catalog_and_db(conn, table, catalog, schema)?,
216            None => self.appender(conn, table, schema)?,
217        };
218        for column in columns {
219            appender.add_column(column)?;
220        }
221        Ok(appender)
222    }
223
224    pub fn get_interrupt_handle(&self) -> Arc<InterruptHandle> {
225        self.interrupt.clone()
226    }
227
228    #[inline]
229    pub fn is_autocommit(&self) -> bool {
230        true
231    }
232}
233
234struct ExtractedStatementsGuard(ffi::duckdb_extracted_statements);
235
236impl Drop for ExtractedStatementsGuard {
237    fn drop(&mut self) {
238        unsafe { ffi::duckdb_destroy_extracted(&mut self.0) }
239    }
240}
241
242impl Drop for InnerConnection {
243    #[allow(unused_must_use)]
244    #[inline]
245    fn drop(&mut self) {
246        use std::thread::panicking;
247        if let Err(e) = self.close() {
248            if panicking() {
249                eprintln!("Error while closing DuckDB connection: {e:?}");
250            } else {
251                panic!("Error while closing DuckDB connection: {e:?}");
252            }
253        }
254    }
255}
256
257/// A handle that allows interrupting long-running queries.
258pub struct InterruptHandle {
259    conn: Mutex<ffi::duckdb_connection>,
260}
261
262unsafe impl Send for InterruptHandle {}
263unsafe impl Sync for InterruptHandle {}
264
265impl InterruptHandle {
266    fn new(conn: ffi::duckdb_connection) -> Self {
267        Self { conn: Mutex::new(conn) }
268    }
269
270    fn clear(&self) {
271        *(self.conn.lock().unwrap()) = ptr::null_mut();
272    }
273
274    /// Interrupt the query currently running on the connection this handle was
275    /// obtained from. The interrupt will cause that query to fail with
276    /// `Error::DuckDBFailure`. If the connection was dropped after obtaining
277    /// this interrupt handle, calling this method results in a noop.
278    ///
279    /// See [`crate::Connection::interrupt_handle`] for an example.
280    pub fn interrupt(&self) {
281        let db_handle = self.conn.lock().unwrap();
282
283        if !db_handle.is_null() {
284            unsafe {
285                ffi::duckdb_interrupt(*db_handle);
286            }
287        }
288    }
289}