1use std::{
2 ffi::{c_void, CStr, CString},
3 mem,
4 os::raw::c_char,
5 ptr, str,
6 sync::{Arc, Mutex},
7};
8
9use super::{ffi, Appender, Config, Connection, Result};
10use crate::{
11 error::{
12 result_from_duckdb_appender, result_from_duckdb_arrow, result_from_duckdb_extract, result_from_duckdb_prepare,
13 Error,
14 },
15 raw_statement::RawStatement,
16 statement::Statement,
17};
18
19pub struct InnerConnection {
20 pub db: ffi::duckdb_database,
21 pub con: ffi::duckdb_connection,
22 interrupt: Arc<InterruptHandle>,
23 owned: bool,
24}
25
26impl InnerConnection {
27 #[inline]
28 pub unsafe fn new(db: ffi::duckdb_database, owned: bool) -> Result<Self> {
29 let mut con: ffi::duckdb_connection = ptr::null_mut();
30 let r = ffi::duckdb_connect(db, &mut con);
31 if r != ffi::DuckDBSuccess {
32 ffi::duckdb_disconnect(&mut con);
33 return Err(Error::DuckDBFailure(
34 ffi::Error::new(r),
35 Some("connect error".to_owned()),
36 ));
37 }
38 let interrupt = Arc::new(InterruptHandle::new(con));
39
40 Ok(Self {
41 db,
42 con,
43 interrupt,
44 owned,
45 })
46 }
47
48 pub fn open_with_flags(c_path: &CStr, config: Config) -> Result<Self> {
49 unsafe {
50 let mut db: ffi::duckdb_database = ptr::null_mut();
51 let mut c_err = std::ptr::null_mut();
52 let r = ffi::duckdb_open_ext(c_path.as_ptr(), &mut db, config.duckdb_config(), &mut c_err);
53 if r != ffi::DuckDBSuccess {
54 let msg = Some(CStr::from_ptr(c_err).to_string_lossy().to_string());
55 ffi::duckdb_free(c_err as *mut c_void);
56 return Err(Error::DuckDBFailure(ffi::Error::new(r), msg));
57 }
58 Self::new(db, true)
59 }
60 }
61
62 pub fn close(&mut self) -> Result<()> {
63 if self.db.is_null() {
64 return Ok(());
65 }
66 if self.con.is_null() {
67 return Ok(());
68 }
69 unsafe {
70 ffi::duckdb_disconnect(&mut self.con);
71 self.con = ptr::null_mut();
72 self.interrupt.clear();
73
74 if self.owned {
75 ffi::duckdb_close(&mut self.db);
76 self.db = ptr::null_mut();
77 }
78 }
79 Ok(())
80 }
81
82 pub fn try_clone(&self) -> Result<Self> {
84 unsafe { Self::new(self.db, false) }
85 }
86
87 pub fn execute(&mut self, sql: &str) -> Result<()> {
88 let c_str = CString::new(sql)?;
89 unsafe {
90 let mut out = mem::zeroed();
91 let r = ffi::duckdb_query_arrow(self.con, c_str.as_ptr() as *const c_char, &mut out);
92 result_from_duckdb_arrow(r, out)?;
93 ffi::duckdb_destroy_arrow(&mut out);
94 Ok(())
95 }
96 }
97
98 pub fn prepare<'a>(&mut self, conn: &'a Connection, sql: &str) -> Result<Statement<'a>> {
99 let c_str = CString::new(sql)?;
100
101 let mut extracted = ptr::null_mut();
103 let num_stmts =
104 unsafe { ffi::duckdb_extract_statements(self.con, c_str.as_ptr() as *const c_char, &mut extracted) };
105 result_from_duckdb_extract(num_stmts, extracted)?;
106
107 let _guard = ExtractedStatementsGuard(extracted);
109
110 for i in 0..num_stmts - 1 {
112 self.execute_extracted_statement(extracted, i)?;
113 }
114
115 let final_stmt = self.prepare_extracted_statement(extracted, num_stmts - 1)?;
117 Ok(Statement::new(conn, unsafe { RawStatement::new(final_stmt) }))
118 }
119
120 fn prepare_extracted_statement(
121 &self,
122 extracted: ffi::duckdb_extracted_statements,
123 index: ffi::idx_t,
124 ) -> Result<ffi::duckdb_prepared_statement> {
125 let mut stmt = ptr::null_mut();
126 let res = unsafe { ffi::duckdb_prepare_extracted_statement(self.con, extracted, index, &mut stmt) };
127 result_from_duckdb_prepare(res, stmt)?;
128 Ok(stmt)
129 }
130
131 fn execute_extracted_statement(
132 &self,
133 extracted: ffi::duckdb_extracted_statements,
134 index: ffi::idx_t,
135 ) -> Result<()> {
136 let mut stmt = self.prepare_extracted_statement(extracted, index)?;
137
138 let mut result = unsafe { mem::zeroed() };
139 let rc = unsafe { ffi::duckdb_execute_prepared(stmt, &mut result) };
140
141 let error = if rc != ffi::DuckDBSuccess {
142 unsafe {
143 let c_err = ffi::duckdb_result_error(&mut result as *mut _);
144 let msg = if c_err.is_null() {
145 None
146 } else {
147 Some(CStr::from_ptr(c_err).to_string_lossy().to_string())
148 };
149 Some(Error::DuckDBFailure(ffi::Error::new(rc), msg))
150 }
151 } else {
152 None
153 };
154
155 unsafe {
156 ffi::duckdb_destroy_prepare(&mut stmt);
157 ffi::duckdb_destroy_result(&mut result);
158 }
159
160 error.map_or(Ok(()), Err)
161 }
162
163 pub fn appender<'a>(&mut self, conn: &'a Connection, table: &str, schema: &str) -> Result<Appender<'a>> {
164 let mut c_app: ffi::duckdb_appender = ptr::null_mut();
165 let c_table = CString::new(table)?;
166 let c_schema = CString::new(schema)?;
167 let r = unsafe {
168 ffi::duckdb_appender_create(
169 self.con,
170 c_schema.as_ptr() as *const c_char,
171 c_table.as_ptr() as *const c_char,
172 &mut c_app,
173 )
174 };
175 result_from_duckdb_appender(r, &mut c_app)?;
176 Ok(Appender::new(conn, c_app))
177 }
178
179 pub fn appender_to_catalog_and_db<'a>(
180 &mut self,
181 conn: &'a Connection,
182 table: &str,
183 catalog: &str,
184 schema: &str,
185 ) -> Result<Appender<'a>> {
186 let mut c_app: ffi::duckdb_appender = ptr::null_mut();
187 let c_table = CString::new(table)?;
188 let c_catalog = CString::new(catalog)?;
189 let c_schema = CString::new(schema)?;
190
191 let r = unsafe {
192 ffi::duckdb_appender_create_ext(
193 self.con,
194 c_catalog.as_ptr() as *const c_char,
195 c_schema.as_ptr() as *const c_char,
196 c_table.as_ptr() as *const c_char,
197 &mut c_app,
198 )
199 };
200 result_from_duckdb_appender(r, &mut c_app)?;
201 Ok(Appender::new(conn, c_app))
202 }
203
204 pub fn appender_with_columns<'a>(
205 &mut self,
206 conn: &'a Connection,
207 table: &str,
208 schema: &str,
209 catalog: Option<&str>,
210 columns: &[&str],
211 ) -> Result<Appender<'a>> {
212 let mut appender = match catalog {
215 Some(catalog) => self.appender_to_catalog_and_db(conn, table, catalog, schema)?,
216 None => self.appender(conn, table, schema)?,
217 };
218 for column in columns {
219 appender.add_column(column)?;
220 }
221 Ok(appender)
222 }
223
224 pub fn get_interrupt_handle(&self) -> Arc<InterruptHandle> {
225 self.interrupt.clone()
226 }
227
228 #[inline]
229 pub fn is_autocommit(&self) -> bool {
230 true
231 }
232}
233
234struct ExtractedStatementsGuard(ffi::duckdb_extracted_statements);
235
236impl Drop for ExtractedStatementsGuard {
237 fn drop(&mut self) {
238 unsafe { ffi::duckdb_destroy_extracted(&mut self.0) }
239 }
240}
241
242impl Drop for InnerConnection {
243 #[allow(unused_must_use)]
244 #[inline]
245 fn drop(&mut self) {
246 use std::thread::panicking;
247 if let Err(e) = self.close() {
248 if panicking() {
249 eprintln!("Error while closing DuckDB connection: {e:?}");
250 } else {
251 panic!("Error while closing DuckDB connection: {e:?}");
252 }
253 }
254 }
255}
256
257pub struct InterruptHandle {
259 conn: Mutex<ffi::duckdb_connection>,
260}
261
262unsafe impl Send for InterruptHandle {}
263unsafe impl Sync for InterruptHandle {}
264
265impl InterruptHandle {
266 fn new(conn: ffi::duckdb_connection) -> Self {
267 Self { conn: Mutex::new(conn) }
268 }
269
270 fn clear(&self) {
271 *(self.conn.lock().unwrap()) = ptr::null_mut();
272 }
273
274 pub fn interrupt(&self) {
281 let db_handle = self.conn.lock().unwrap();
282
283 if !db_handle.is_null() {
284 unsafe {
285 ffi::duckdb_interrupt(*db_handle);
286 }
287 }
288 }
289}