duckdb/
lib.rs

1//! duckdb-rs is an ergonomic wrapper for using DuckDB from Rust. It attempts to
2//! expose an interface similar to [rusqlite](https://github.com/rusqlite/rusqlite).
3//!
4//! ```rust
5//! use duckdb::{params, Connection, Result};
6//! use duckdb::arrow::record_batch::RecordBatch;
7//! use duckdb::arrow::util::pretty::print_batches;
8//!
9//! #[derive(Debug)]
10//! struct Person {
11//!     id: i32,
12//!     name: String,
13//!     data: Option<Vec<u8>>,
14//! }
15//!
16//! fn main() -> Result<()> {
17//!     let conn = Connection::open_in_memory()?;
18//!
19//!     conn.execute_batch(
20//!         r"CREATE SEQUENCE seq;
21//!           CREATE TABLE person (
22//!                   id              INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq'),
23//!                   name            TEXT NOT NULL,
24//!                   data            BLOB
25//!                   );
26//!          ")?;
27//!     let me = Person {
28//!         id: 0,
29//!         name: "Steven".to_string(),
30//!         data: None,
31//!     };
32//!     conn.execute(
33//!         "INSERT INTO person (name, data) VALUES (?, ?)",
34//!         params![me.name, me.data],
35//!     )?;
36//!
37//!     let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
38//!     let person_iter = stmt.query_map([], |row| {
39//!         Ok(Person {
40//!             id: row.get(0)?,
41//!             name: row.get(1)?,
42//!             data: row.get(2)?,
43//!         })
44//!     })?;
45//!
46//!     for person in person_iter {
47//!         println!("Found person {:?}", person.unwrap());
48//!     }
49//!
50//!     // query table by arrow
51//!     let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
52//!     print_batches(&rbs);
53//!     Ok(())
54//! }
55//! ```
56#![warn(missing_docs)]
57
58pub use libduckdb_sys as ffi;
59
60use std::{
61    cell::RefCell,
62    convert,
63    ffi::CString,
64    fmt,
65    path::{Path, PathBuf},
66    result, str,
67};
68
69use crate::{cache::StatementCache, inner_connection::InnerConnection, raw_statement::RawStatement, types::ValueRef};
70
71#[cfg(feature = "r2d2")]
72pub use crate::r2d2::DuckdbConnectionManager;
73pub use crate::{
74    appender::Appender,
75    appender_params::{appender_params_from_iter, AppenderParams, AppenderParamsFromIter},
76    arrow_batch::{Arrow, ArrowStream},
77    cache::CachedStatement,
78    column::Column,
79    config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
80    error::Error,
81    ffi::ErrorCode,
82    inner_connection::InterruptHandle,
83    params::{params_from_iter, Params, ParamsFromIter},
84    row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
85    statement::Statement,
86    transaction::{DropBehavior, Transaction},
87    types::ToSql,
88};
89#[cfg(feature = "polars")]
90pub use polars_dataframe::Polars;
91
92// re-export dependencies to minimise version maintenance for crate users
93pub use arrow;
94#[cfg(feature = "polars")]
95pub use polars;
96#[cfg(feature = "polars")]
97pub use polars_arrow as arrow2;
98
99/// The core module contains the main functionality of the DuckDB crate.
100pub mod core;
101
102#[macro_use]
103mod error;
104mod appender;
105mod appender_params;
106mod arrow_batch;
107mod cache;
108mod column;
109mod config;
110mod inner_connection;
111mod params;
112#[cfg(feature = "polars")]
113mod polars_dataframe;
114mod pragma;
115#[cfg(feature = "r2d2")]
116mod r2d2;
117mod raw_statement;
118mod row;
119mod statement;
120mod transaction;
121
122#[cfg(feature = "extensions-full")]
123mod extension;
124
125pub mod types;
126/// The duckdb table function interface
127#[cfg(feature = "vtab")]
128pub mod vtab;
129
130/// The duckdb scalar function interface
131#[cfg(feature = "vscalar")]
132pub mod vscalar;
133
134#[cfg(test)]
135mod test_all_types;
136
137// Number of cached prepared statements we'll hold on to.
138const STATEMENT_CACHE_DEFAULT_CAPACITY: usize = 16;
139
140/// A macro making it more convenient to pass heterogeneous or long lists of
141/// parameters as a `&[&dyn ToSql]`.
142///
143/// # Example
144///
145/// ```rust,no_run
146/// # use duckdb::{Result, Connection, params};
147///
148/// struct Person {
149///     name: String,
150///     age_in_years: u8,
151///     data: Option<Vec<u8>>,
152/// }
153///
154/// fn add_person(conn: &Connection, person: &Person) -> Result<()> {
155///     conn.execute("INSERT INTO person (name, age_in_years, data)
156///                   VALUES (?1, ?2, ?3)",
157///                  params![person.name, person.age_in_years, person.data])?;
158///     Ok(())
159/// }
160/// ```
161#[macro_export]
162macro_rules! params {
163    () => {
164        &[] as &[&dyn $crate::ToSql]
165    };
166    ($($param:expr),+ $(,)?) => {
167        &[$(&$param as &dyn $crate::ToSql),+] as &[&dyn $crate::ToSql]
168    };
169}
170
171/// A typedef of the result returned by many methods.
172pub type Result<T, E = Error> = result::Result<T, E>;
173
174/// See the [method documentation](#tymethod.optional).
175pub trait OptionalExt<T> {
176    /// Converts a `Result<T>` into a `Result<Option<T>>`.
177    ///
178    /// By default, duckdb-rs treats 0 rows being returned from a query that is
179    /// expected to return 1 row as an error. This method will
180    /// handle that error, and give you back an `Option<T>` instead.
181    fn optional(self) -> Result<Option<T>>;
182}
183
184impl<T> OptionalExt<T> for Result<T> {
185    fn optional(self) -> Result<Option<T>> {
186        match self {
187            Ok(value) => Ok(Some(value)),
188            Err(Error::QueryReturnedNoRows) => Ok(None),
189            Err(e) => Err(e),
190        }
191    }
192}
193
194/// Name for a database within a DuckDB connection.
195#[derive(Copy, Clone, Debug)]
196pub enum DatabaseName<'a> {
197    /// The main database.
198    Main,
199
200    /// The temporary database (e.g., any "CREATE TEMPORARY TABLE" tables).
201    Temp,
202
203    /// A database that has been attached via "ATTACH DATABASE ...".
204    Attached(&'a str),
205}
206
207#[allow(clippy::needless_lifetimes)]
208impl<'a> fmt::Display for DatabaseName<'a> {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        match *self {
211            DatabaseName::Main => write!(f, "main"),
212            DatabaseName::Temp => write!(f, "temp"),
213            DatabaseName::Attached(s) => write!(f, "{s}"),
214        }
215    }
216}
217
218/// Shorthand for [`DatabaseName::Main`].
219pub const MAIN_DB: DatabaseName<'static> = DatabaseName::Main;
220
221/// Shorthand for [`DatabaseName::Temp`].
222pub const TEMP_DB: DatabaseName<'static> = DatabaseName::Temp;
223
224/// A connection to a DuckDB database.
225pub struct Connection {
226    db: RefCell<InnerConnection>,
227    cache: StatementCache,
228    path: Option<PathBuf>,
229}
230
231unsafe impl Send for Connection {}
232
233impl Connection {
234    /// Open a new connection to a DuckDB database.
235    ///
236    /// `Connection::open(path)` is equivalent to
237    /// `Connection::open_with_flags(path,
238    /// Config::default())`.
239    ///
240    /// ```rust,no_run
241    /// # use duckdb::{Connection, Result};
242    /// fn open_my_db() -> Result<()> {
243    ///     let path = "./my_db.db3";
244    ///     let db = Connection::open(&path)?;
245    ///     println!("{}", db.is_autocommit());
246    ///     Ok(())
247    /// }
248    /// ```
249    ///
250    /// # Failure
251    ///
252    /// Will return `Err` if `path` cannot be converted to a C-compatible
253    /// string or if the underlying DuckDB open call fails.
254    #[inline]
255    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
256        Self::open_with_flags(path, Config::default())
257    }
258
259    /// Open a new connection to an in-memory DuckDB database.
260    ///
261    /// # Failure
262    ///
263    /// Will return `Err` if the underlying DuckDB open call fails.
264    #[inline]
265    pub fn open_in_memory() -> Result<Self> {
266        Self::open_in_memory_with_flags(Config::default())
267    }
268
269    /// Open a new connection to an ffi database.
270    ///
271    /// # Failure
272    ///
273    /// Will return `Err` if the underlying DuckDB open call fails.
274    /// # Safety
275    ///
276    /// Need to pass in a valid db instance
277    #[inline]
278    pub unsafe fn open_from_raw(raw: ffi::duckdb_database) -> Result<Self> {
279        InnerConnection::new(raw, false).map(|db| Self {
280            db: RefCell::new(db),
281            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
282            path: None, // Can we know the path from connection?
283        })
284    }
285
286    /// Open a new connection to a DuckDB database.
287    ///
288    /// # Failure
289    ///
290    /// Will return `Err` if `path` cannot be converted to a C-compatible
291    /// string or if the underlying DuckDB open call fails.
292    #[inline]
293    pub fn open_with_flags<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
294        #[cfg(unix)]
295        fn path_to_cstring(p: &Path) -> Result<CString> {
296            use std::os::unix::ffi::OsStrExt;
297            Ok(CString::new(p.as_os_str().as_bytes())?)
298        }
299
300        #[cfg(not(unix))]
301        fn path_to_cstring(p: &Path) -> Result<CString> {
302            let s = p.to_str().ok_or_else(|| Error::InvalidPath(p.to_owned()))?;
303            Ok(CString::new(s)?)
304        }
305
306        let c_path = path_to_cstring(path.as_ref())?;
307        let config = config.with("duckdb_api", "rust").unwrap();
308        InnerConnection::open_with_flags(&c_path, config).map(|db| Self {
309            db: RefCell::new(db),
310            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
311            path: Some(path.as_ref().to_path_buf()),
312        })
313    }
314
315    /// Open a new connection to an in-memory DuckDB database.
316    ///
317    /// # Failure
318    ///
319    /// Will return `Err` if the underlying DuckDB open call fails.
320    #[inline]
321    pub fn open_in_memory_with_flags(config: Config) -> Result<Self> {
322        Self::open_with_flags(":memory:", config)
323    }
324
325    /// Convenience method to run multiple SQL statements (that cannot take any
326    /// parameters).
327    ///
328    /// ## Example
329    ///
330    /// ```rust,no_run
331    /// # use duckdb::{Connection, Result};
332    /// fn create_tables(conn: &Connection) -> Result<()> {
333    ///     conn.execute_batch("BEGIN;
334    ///                         CREATE TABLE foo(x INTEGER);
335    ///                         CREATE TABLE bar(y TEXT);
336    ///                         COMMIT;",
337    ///     )
338    /// }
339    /// ```
340    ///
341    /// # Failure
342    ///
343    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
344    /// or if the underlying DuckDB call fails.
345    pub fn execute_batch(&self, sql: &str) -> Result<()> {
346        self.db.borrow_mut().execute(sql)
347    }
348
349    /// Convenience method to prepare and execute a single SQL statement.
350    ///
351    /// On success, returns the number of rows that were changed or inserted or
352    /// deleted.
353    ///
354    /// ## Example
355    ///
356    /// ### With params
357    ///
358    /// ```rust,no_run
359    /// # use duckdb::{Connection};
360    /// fn update_rows(conn: &Connection) {
361    ///     match conn.execute("UPDATE foo SET bar = 'baz' WHERE qux = ?", [1i32]) {
362    ///         Ok(updated) => println!("{} rows were updated", updated),
363    ///         Err(err) => println!("update failed: {}", err),
364    ///     }
365    /// }
366    /// ```
367    ///
368    /// ### With params of varying types
369    ///
370    /// ```rust,no_run
371    /// # use duckdb::{Connection, params};
372    /// fn update_rows(conn: &Connection) {
373    ///     match conn.execute("UPDATE foo SET bar = ? WHERE qux = ?", params![&"baz", 1i32]) {
374    ///         Ok(updated) => println!("{} rows were updated", updated),
375    ///         Err(err) => println!("update failed: {}", err),
376    ///     }
377    /// }
378    /// ```
379    ///
380    /// # Failure
381    ///
382    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
383    /// or if the underlying DuckDB call fails.
384    #[inline]
385    pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<usize> {
386        self.prepare(sql).and_then(|mut stmt| stmt.execute(params))
387    }
388
389    /// Returns the path to the database file, if one exists and is known.
390    #[inline]
391    pub fn path(&self) -> Option<&Path> {
392        self.path.as_deref()
393    }
394
395    /// Convenience method to execute a query that is expected to return a
396    /// single row.
397    ///
398    /// ## Example
399    ///
400    /// ```rust,no_run
401    /// # use duckdb::{Result, Connection};
402    /// fn preferred_locale(conn: &Connection) -> Result<String> {
403    ///     conn.query_row(
404    ///         "SELECT value FROM preferences WHERE name='locale'",
405    ///         [],
406    ///         |row| row.get(0),
407    ///     )
408    /// }
409    /// ```
410    ///
411    /// If the query returns more than one row, all rows except the first are
412    /// ignored.
413    ///
414    /// Returns `Err(QueryReturnedNoRows)` if no results are returned. If the
415    /// query truly is optional, you can call `.optional()` on the result of
416    /// this to get a `Result<Option<T>>`.
417    ///
418    /// # Failure
419    ///
420    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
421    /// or if the underlying DuckDB call fails.
422    #[inline]
423    pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
424    where
425        P: Params,
426        F: FnOnce(&Row<'_>) -> Result<T>,
427    {
428        self.prepare(sql)?.query_row(params, f)
429    }
430
431    /// Convenience method to execute a query that is expected to return a
432    /// single row, and execute a mapping via `f` on that returned row with
433    /// the possibility of failure. The `Result` type of `f` must implement
434    /// `std::convert::From<Error>`.
435    ///
436    /// ## Example
437    ///
438    /// ```rust,no_run
439    /// # use duckdb::{Result, Connection};
440    /// fn preferred_locale(conn: &Connection) -> Result<String> {
441    ///     conn.query_row_and_then(
442    ///         "SELECT value FROM preferences WHERE name='locale'",
443    ///         [],
444    ///         |row| row.get(0),
445    ///     )
446    /// }
447    /// ```
448    ///
449    /// If the query returns more than one row, all rows except the first are
450    /// ignored.
451    ///
452    /// # Failure
453    ///
454    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
455    /// or if the underlying DuckDB call fails.
456    #[inline]
457    pub fn query_row_and_then<T, E, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, E>
458    where
459        P: Params,
460        F: FnOnce(&Row<'_>) -> Result<T, E>,
461        E: convert::From<Error>,
462    {
463        self.prepare(sql)?
464            .query(params)?
465            .get_expected_row()
466            .map_err(E::from)
467            .and_then(f)
468    }
469
470    /// Prepare a SQL statement for execution.
471    ///
472    /// ## Example
473    ///
474    /// ```rust,no_run
475    /// # use duckdb::{Connection, Result};
476    /// fn insert_new_people(conn: &Connection) -> Result<()> {
477    ///     let mut stmt = conn.prepare("INSERT INTO People (name) VALUES (?)")?;
478    ///     stmt.execute(["Joe Smith"])?;
479    ///     stmt.execute(["Bob Jones"])?;
480    ///     Ok(())
481    /// }
482    /// ```
483    ///
484    /// # Failure
485    ///
486    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
487    /// or if the underlying DuckDB call fails.
488    #[inline]
489    pub fn prepare(&self, sql: &str) -> Result<Statement<'_>> {
490        self.db.borrow_mut().prepare(self, sql)
491    }
492
493    /// Create an Appender for fast import data
494    /// default to use `DatabaseName::Main`
495    ///
496    /// ## Example
497    ///
498    /// ```rust,no_run
499    /// # use duckdb::{Connection, Result, params};
500    /// fn insert_rows(conn: &Connection) -> Result<()> {
501    ///     let mut app = conn.appender("foo")?;
502    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
503    ///     Ok(())
504    /// }
505    /// ```
506    ///
507    /// # Failure
508    ///
509    /// Will return `Err` if `table` not exists
510    pub fn appender(&self, table: &str) -> Result<Appender<'_>> {
511        self.appender_to_db(table, &DatabaseName::Main.to_string())
512    }
513
514    /// Create an Appender for fast import data
515    ///
516    /// ## Example
517    ///
518    /// ```rust,no_run
519    /// # use duckdb::{Connection, Result, params, DatabaseName};
520    /// fn insert_rows(conn: &Connection) -> Result<()> {
521    ///     let mut app = conn.appender_to_db("foo", &DatabaseName::Main.to_string())?;
522    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
523    ///     Ok(())
524    /// }
525    /// ```
526    ///
527    /// # Failure
528    ///
529    /// Will return `Err` if `table` not exists
530    pub fn appender_to_db(&self, table: &str, schema: &str) -> Result<Appender<'_>> {
531        self.db.borrow_mut().appender(self, table, schema)
532    }
533
534    /// Create an Appender for fast import data with provided catalog, schema and table
535    ///
536    /// ## Example
537    ///
538    /// ```rust,no_run
539    /// # use duckdb::{Connection, Result, params, DatabaseName};
540    /// fn insert_rows(conn: &Connection) -> Result<()> {
541    ///     let mut app = conn.appender_to_catalog_and_db("catalog", &DatabaseName::Main.to_string(), "foo")?;
542    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
543    ///     Ok(())
544    /// }
545    /// ```
546    ///
547    /// # Failure
548    ///
549    /// Will return `Err` if `catalog` or `schema` not exists
550    pub fn appender_to_catalog_and_db(&self, table: &str, catalog: &str, schema: &str) -> Result<Appender<'_>> {
551        self.db
552            .borrow_mut()
553            .appender_to_catalog_and_db(self, table, catalog, schema)
554    }
555
556    /// Create an Appender that only provides values for specific columns.
557    ///
558    /// Columns not in the list will use their DEFAULT value, or NULL if no default.
559    /// This supports all types of DEFAULT expressions including non-deterministic
560    /// ones like `random()`, `current_timestamp`, or sequences.
561    ///
562    /// ## Example
563    ///
564    /// ```rust,no_run
565    /// # use duckdb::{Connection, Result};
566    /// fn insert_partial(conn: &Connection) -> Result<()> {
567    ///     // Table: CREATE TABLE foo(id INT DEFAULT nextval('seq'), name TEXT, created TIMESTAMP DEFAULT current_timestamp)
568    ///     let mut app = conn.appender_with_columns("foo", &["name"])?;
569    ///     // Only provide name; id and created use their defaults
570    ///     app.append_row(["Alice"])?;
571    ///     app.append_row(["Bob"])?;
572    ///     Ok(())
573    /// }
574    /// ```
575    ///
576    /// # Failure
577    ///
578    /// Will return `Err` if `table` does not exist or a column name is invalid.
579    pub fn appender_with_columns(&self, table: &str, columns: &[&str]) -> Result<Appender<'_>> {
580        self.appender_with_columns_to_db(table, &DatabaseName::Main.to_string(), columns)
581    }
582
583    /// Create an Appender that only provides values for specific columns, with schema.
584    ///
585    /// See [`appender_with_columns`](Connection::appender_with_columns) for details.
586    pub fn appender_with_columns_to_db(&self, table: &str, schema: &str, columns: &[&str]) -> Result<Appender<'_>> {
587        self.db
588            .borrow_mut()
589            .appender_with_columns(self, table, schema, None, columns)
590    }
591
592    /// Create an Appender that only provides values for specific columns, with catalog and schema.
593    ///
594    /// See [`appender_with_columns`](Connection::appender_with_columns) for details.
595    pub fn appender_with_columns_to_catalog_and_db(
596        &self,
597        table: &str,
598        catalog: &str,
599        schema: &str,
600        columns: &[&str],
601    ) -> Result<Appender<'_>> {
602        self.db
603            .borrow_mut()
604            .appender_with_columns(self, table, schema, Some(catalog), columns)
605    }
606
607    /// Get a handle to interrupt long-running queries.
608    ///
609    /// ## Example
610    ///
611    /// ```rust,no_run
612    /// # use duckdb::{Connection, Result};
613    /// fn run_query(conn: Connection) -> Result<()> {
614    ///   let interrupt_handle = conn.interrupt_handle();
615    ///   let join_handle = std::thread::spawn(move || { conn.execute("expensive query", []) });
616    ///
617    ///   // Arbitrary wait for query to start
618    ///   std::thread::sleep(std::time::Duration::from_millis(100));
619    ///
620    ///   interrupt_handle.interrupt();
621    ///
622    ///   let query_result = join_handle.join().unwrap();
623    ///   assert!(query_result.is_err());
624    ///
625    ///   Ok(())
626    /// }
627    pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
628        self.db.borrow().get_interrupt_handle()
629    }
630
631    /// Close the DuckDB connection.
632    ///
633    /// This is functionally equivalent to the `Drop` implementation for
634    /// `Connection` except that on failure, it returns an error and the
635    /// connection itself (presumably so closing can be attempted again).
636    ///
637    /// # Failure
638    ///
639    /// Will return `Err` if the underlying DuckDB call fails.
640    #[inline]
641    #[allow(clippy::result_large_err)]
642    pub fn close(self) -> Result<(), (Self, Error)> {
643        let r = self.db.borrow_mut().close();
644        r.map_err(move |err| (self, err))
645    }
646
647    /// Test for auto-commit mode.
648    /// Autocommit mode is on by default.
649    #[inline]
650    pub fn is_autocommit(&self) -> bool {
651        self.db.borrow().is_autocommit()
652    }
653
654    /// Creates a new connection to the already-opened database.
655    pub fn try_clone(&self) -> Result<Self> {
656        let inner = self.db.borrow().try_clone()?;
657        Ok(Self {
658            db: RefCell::new(inner),
659            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
660            path: self.path.clone(),
661        })
662    }
663
664    /// Returns the version of the DuckDB library
665    pub fn version(&self) -> Result<String> {
666        self.query_row("PRAGMA version", [], |row| row.get(0))
667    }
668}
669
670impl fmt::Debug for Connection {
671    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
672        f.debug_struct("Connection").field("path", &self.path).finish()
673    }
674}
675
676#[cfg(doctest)]
677doc_comment::doctest!("../../../README.md");
678
679#[cfg(test)]
680mod test {
681    use crate::types::Value;
682
683    use super::*;
684    use std::{error::Error as StdError, fmt};
685
686    use arrow::{array::Int32Array, datatypes::DataType, record_batch::RecordBatch};
687    use fallible_iterator::FallibleIterator;
688
689    // this function is never called, but is still type checked; in
690    // particular, calls with specific instantiations will require
691    // that those types are `Send`.
692    #[allow(dead_code, unconditional_recursion, clippy::extra_unused_type_parameters)]
693    fn ensure_send<T: Send>() {
694        ensure_send::<Connection>();
695    }
696
697    pub fn checked_memory_handle() -> Connection {
698        Connection::open_in_memory().unwrap()
699    }
700
701    #[test]
702    fn test_params_of_vary_types() -> Result<()> {
703        let db = checked_memory_handle();
704        let sql = "BEGIN;
705                   CREATE TABLE foo(bar TEXT, qux INTEGER);
706                   INSERT INTO foo VALUES ('baz', 1), ('baz', 2), ('baz', 3);
707                   END;";
708        db.execute_batch(sql)?;
709
710        let changed = db.execute("UPDATE foo SET qux = ? WHERE bar = ?", params![1i32, &"baz"])?;
711        assert_eq!(changed, 3);
712        Ok(())
713    }
714
715    #[test]
716    #[cfg_attr(windows, ignore = "Windows doesn't allow concurrent writes to a file")]
717    fn test_concurrent_transactions_busy_commit() -> Result<()> {
718        let tmp = tempfile::tempdir().unwrap();
719        let path = tmp.path().join("transactions.db3");
720
721        Connection::open(&path)?.execute_batch(
722            "
723            BEGIN;
724            CREATE TABLE foo(x INTEGER);
725            INSERT INTO foo VALUES(42);
726            END;",
727        )?;
728
729        let mut db1 =
730            Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
731        let mut db2 =
732            Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
733
734        {
735            let tx1 = db1.transaction()?;
736            let tx2 = db2.transaction()?;
737
738            // SELECT first makes sqlite lock with a shared lock
739            tx1.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
740            tx2.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
741
742            tx1.execute("INSERT INTO foo VALUES(?1)", [1])?;
743            let _ = tx2.execute("INSERT INTO foo VALUES(?1)", [2]);
744
745            let _ = tx1.commit();
746            let _ = tx2.commit();
747        }
748
749        let _ = db1.transaction().expect("commit should have closed transaction");
750        let _ = db2.transaction().expect("commit should have closed transaction");
751        Ok(())
752    }
753
754    #[test]
755    fn test_persistence() -> Result<()> {
756        let temp_dir = tempfile::tempdir().unwrap();
757        let path = temp_dir.path().join("test.db3");
758
759        {
760            let db = Connection::open(&path)?;
761            let sql = "BEGIN;
762                   CREATE TABLE foo(x INTEGER);
763                   INSERT INTO foo VALUES(42);
764                   END;";
765            db.execute_batch(sql)?;
766        }
767
768        let path_string = path.to_str().unwrap();
769        let db = Connection::open(path_string)?;
770        let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
771
772        assert_eq!(42i64, the_answer?);
773        Ok(())
774    }
775
776    #[test]
777    fn test_open() {
778        let con = Connection::open_in_memory();
779        if con.is_err() {
780            panic!("open error {}", con.unwrap_err());
781        }
782        assert!(Connection::open_in_memory().is_ok());
783        let db = checked_memory_handle();
784        assert!(db.close().is_ok());
785        let _ = checked_memory_handle();
786        let _ = checked_memory_handle();
787    }
788
789    #[test]
790    fn test_open_from_raw() {
791        let con = Connection::open_in_memory();
792        assert!(con.is_ok());
793        let inner_con: InnerConnection = con.unwrap().db.into_inner();
794        unsafe {
795            assert!(Connection::open_from_raw(inner_con.db).is_ok());
796        }
797    }
798
799    #[test]
800    fn test_open_failure() -> Result<()> {
801        let filename = "no_such_file.db";
802        let result =
803            Connection::open_with_flags(filename, Config::default().access_mode(config::AccessMode::ReadOnly)?);
804        assert!(result.is_err());
805        let err = result.err().unwrap();
806        if let Error::DuckDBFailure(_e, Some(msg)) = err {
807            // TODO: update error code
808            // assert_eq!(ErrorCode::CannotOpen, e.code);
809            assert!(
810                msg.contains(filename),
811                "error message '{msg}' does not contain '{filename}'"
812            );
813        } else {
814            panic!("DuckDBFailure expected");
815        }
816        Ok(())
817    }
818
819    #[cfg(unix)]
820    #[test]
821    fn test_invalid_unicode_file_names() -> Result<()> {
822        use std::{ffi::OsStr, fs::File, os::unix::ffi::OsStrExt};
823        let temp_dir = tempfile::tempdir().unwrap();
824
825        let path = temp_dir.path();
826        if File::create(path.join(OsStr::from_bytes(&[0xFE]))).is_err() {
827            // Skip test, filesystem doesn't support invalid Unicode
828            return Ok(());
829        }
830        let db_path = path.join(OsStr::from_bytes(&[0xFF]));
831        {
832            let db = Connection::open(&db_path)?;
833            let sql = "BEGIN;
834                   CREATE TABLE foo(x INTEGER);
835                   INSERT INTO foo VALUES(42);
836                   END;";
837            db.execute_batch(sql)?;
838        }
839
840        let db = Connection::open(&db_path)?;
841        let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
842
843        assert_eq!(42i64, the_answer?);
844        Ok(())
845    }
846
847    #[test]
848    fn test_close_always_ok() -> Result<()> {
849        let db = checked_memory_handle();
850
851        // TODO: prepare a query but not execute it
852
853        db.close().unwrap();
854        Ok(())
855    }
856
857    #[test]
858    fn test_execute_batch() -> Result<()> {
859        let db = checked_memory_handle();
860        let sql = "BEGIN;
861                   CREATE TABLE foo(x INTEGER);
862                   INSERT INTO foo VALUES(1);
863                   INSERT INTO foo VALUES(2);
864                   INSERT INTO foo VALUES(3);
865                   INSERT INTO foo VALUES(4);
866                   END;";
867        db.execute_batch(sql)?;
868
869        db.execute_batch("UPDATE foo SET x = 3 WHERE x < 3")?;
870
871        assert!(db.execute_batch("INVALID SQL").is_err());
872        Ok(())
873    }
874
875    #[test]
876    fn test_execute_single() -> Result<()> {
877        let db = checked_memory_handle();
878        db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
879
880        assert_eq!(
881            3,
882            db.execute("INSERT INTO foo(x) VALUES (?), (?), (?)", [1i32, 2i32, 3i32])?
883        );
884        assert_eq!(1, db.execute("INSERT INTO foo(x) VALUES (?)", [4i32])?);
885
886        assert_eq!(
887            10i32,
888            db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
889        );
890        Ok(())
891    }
892
893    #[test]
894    fn test_prepare_column_names() -> Result<()> {
895        let db = checked_memory_handle();
896        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
897
898        let mut stmt = db.prepare("SELECT * FROM foo")?;
899        stmt.execute([])?;
900        assert_eq!(stmt.column_count(), 1);
901        assert_eq!(stmt.column_names(), vec!["x"]);
902
903        let mut stmt = db.prepare("SELECT x AS a, x AS b FROM foo")?;
904        stmt.execute([])?;
905        assert_eq!(stmt.column_count(), 2);
906        assert_eq!(stmt.column_names(), vec!["a", "b"]);
907        Ok(())
908    }
909
910    #[test]
911    fn test_prepare_execute() -> Result<()> {
912        let db = checked_memory_handle();
913        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
914
915        let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
916        assert_eq!(insert_stmt.execute([1i32])?, 1);
917        assert_eq!(insert_stmt.execute([2i32])?, 1);
918        assert_eq!(insert_stmt.execute([3i32])?, 1);
919
920        assert!(insert_stmt.execute(["hello"]).is_err());
921        // NOTE: can't execute on errored stmt
922        // assert!(insert_stmt.execute(["goodbye"]).is_err());
923        // assert!(insert_stmt.execute([types::Null]).is_err());
924
925        let mut update_stmt = db.prepare("UPDATE foo SET x=? WHERE x<?")?;
926        assert_eq!(update_stmt.execute([3i32, 3i32])?, 2);
927        assert_eq!(update_stmt.execute([3i32, 3i32])?, 0);
928        assert_eq!(update_stmt.execute([8i32, 8i32])?, 3);
929        Ok(())
930    }
931
932    #[test]
933    fn test_prepare_query() -> Result<()> {
934        let db = checked_memory_handle();
935        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
936
937        let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
938        assert_eq!(insert_stmt.execute([1i32])?, 1);
939        assert_eq!(insert_stmt.execute([2i32])?, 1);
940        assert_eq!(insert_stmt.execute([3i32])?, 1);
941
942        let mut query = db.prepare("SELECT x FROM foo WHERE x < ? ORDER BY x DESC")?;
943        {
944            let mut rows = query.query([4i32])?;
945            let mut v = Vec::<i32>::new();
946
947            while let Some(row) = rows.next()? {
948                v.push(row.get(0)?);
949            }
950
951            assert_eq!(v, [3i32, 2, 1]);
952        }
953
954        {
955            let mut rows = query.query([3i32])?;
956            let mut v = Vec::<i32>::new();
957
958            while let Some(row) = rows.next()? {
959                v.push(row.get(0)?);
960            }
961
962            assert_eq!(v, [2i32, 1]);
963        }
964        Ok(())
965    }
966
967    #[test]
968    fn test_query_map() -> Result<()> {
969        let db = checked_memory_handle();
970        let sql = "BEGIN;
971                   CREATE TABLE foo(x INTEGER, y TEXT);
972                   INSERT INTO foo VALUES(4, 'hello');
973                   INSERT INTO foo VALUES(3, ', ');
974                   INSERT INTO foo VALUES(2, 'world');
975                   INSERT INTO foo VALUES(1, '!');
976                   END;";
977        db.execute_batch(sql)?;
978
979        let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
980        let results: Result<Vec<String>> = query.query([])?.map(|row| row.get(1)).collect();
981
982        assert_eq!(results?.concat(), "hello, world!");
983        Ok(())
984    }
985
986    #[test]
987    fn test_query_row() -> Result<()> {
988        let db = checked_memory_handle();
989        let sql = "BEGIN;
990                   CREATE TABLE foo(x INTEGER);
991                   INSERT INTO foo VALUES(1);
992                   INSERT INTO foo VALUES(2);
993                   INSERT INTO foo VALUES(3);
994                   INSERT INTO foo VALUES(4);
995                   END;";
996        db.execute_batch(sql)?;
997
998        assert_eq!(
999            10i64,
1000            db.query_row::<i64, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
1001        );
1002
1003        let result: Result<i64> = db.query_row("SELECT x FROM foo WHERE x > 5", [], |r| r.get(0));
1004        match result.unwrap_err() {
1005            Error::QueryReturnedNoRows => (),
1006            err => panic!("Unexpected error {err}"),
1007        }
1008
1009        let bad_query_result = db.query_row("NOT A PROPER QUERY; test123", [], |_| Ok(()));
1010
1011        assert!(bad_query_result.is_err());
1012        Ok(())
1013    }
1014
1015    #[test]
1016    fn test_optional() -> Result<()> {
1017        let db = checked_memory_handle();
1018
1019        let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 <> 0", [], |r| r.get(0));
1020        let result = result.optional();
1021        match result? {
1022            None => (),
1023            _ => panic!("Unexpected result"),
1024        }
1025
1026        let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 == 0", [], |r| r.get(0));
1027        let result = result.optional();
1028        match result? {
1029            Some(1) => (),
1030            _ => panic!("Unexpected result"),
1031        }
1032
1033        let bad_query_result: Result<i64> = db.query_row("NOT A PROPER QUERY", [], |r| r.get(0));
1034        let bad_query_result = bad_query_result.optional();
1035        assert!(bad_query_result.is_err());
1036        Ok(())
1037    }
1038
1039    #[test]
1040    fn test_prepare_failures() -> Result<()> {
1041        let db = checked_memory_handle();
1042        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1043
1044        let _ = db.prepare("SELECT * FROM does_not_exist").unwrap_err();
1045        // assert!(format!("{}", err).contains("does_not_exist"));
1046        Ok(())
1047    }
1048
1049    #[test]
1050    fn test_is_autocommit() {
1051        let db = checked_memory_handle();
1052        assert!(db.is_autocommit(), "autocommit expected to be active by default");
1053    }
1054
1055    #[test]
1056    #[should_panic(expected = "not supported")]
1057    fn test_statement_debugging() {
1058        let db = checked_memory_handle();
1059        let query = "SELECT 12345";
1060        let stmt = db.prepare(query).unwrap();
1061
1062        assert!(format!("{stmt:?}").contains(query));
1063    }
1064
1065    #[test]
1066    fn test_notnull_constraint_error() -> Result<()> {
1067        let db = checked_memory_handle();
1068        db.execute_batch("CREATE TABLE foo(x TEXT NOT NULL)")?;
1069
1070        let result = db.execute("INSERT INTO foo (x) VALUES (NULL)", []);
1071        assert!(result.is_err());
1072
1073        match result.unwrap_err() {
1074            Error::DuckDBFailure(err, _) => {
1075                // TODO(wangfenjin): Update errorcode
1076                assert_eq!(err.code, ErrorCode::Unknown);
1077            }
1078            err => panic!("Unexpected error {err}"),
1079        }
1080        Ok(())
1081    }
1082
1083    #[test]
1084    fn test_clone() -> Result<()> {
1085        // 1. Drop the cloned connection first. The original connection should still be able to run queries.
1086        {
1087            let owned_con = checked_memory_handle();
1088            {
1089                let cloned_con = owned_con.try_clone().unwrap();
1090                cloned_con.execute_batch("create table test (c1 bigint)")?;
1091                cloned_con.close().unwrap();
1092            }
1093            owned_con.execute_batch("create table test2 (c1 bigint)")?;
1094            owned_con.close().unwrap();
1095        }
1096
1097        // 2. Close the original connection first. The cloned connection should still be able to run queries.
1098        {
1099            let cloned_con = {
1100                let owned_con = checked_memory_handle();
1101                let clone = owned_con.try_clone().unwrap();
1102                owned_con.execute_batch("create table test (c1 bigint)")?;
1103                owned_con.close().unwrap();
1104                clone
1105            };
1106            cloned_con.execute_batch("create table test2 (c1 bigint)")?;
1107            cloned_con.close().unwrap();
1108        }
1109        Ok(())
1110    }
1111
1112    mod query_and_then_tests {
1113        use super::*;
1114
1115        #[derive(Debug)]
1116        enum CustomError {
1117            SomeError,
1118            Sqlite(Error),
1119        }
1120
1121        impl fmt::Display for CustomError {
1122            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1123                match *self {
1124                    Self::SomeError => write!(f, "my custom error"),
1125                    Self::Sqlite(ref se) => write!(f, "my custom error: {se}"),
1126                }
1127            }
1128        }
1129
1130        impl StdError for CustomError {
1131            fn description(&self) -> &str {
1132                "my custom error"
1133            }
1134
1135            fn cause(&self) -> Option<&dyn StdError> {
1136                match *self {
1137                    Self::SomeError => None,
1138                    Self::Sqlite(ref se) => Some(se),
1139                }
1140            }
1141        }
1142
1143        impl From<Error> for CustomError {
1144            fn from(se: Error) -> Self {
1145                Self::Sqlite(se)
1146            }
1147        }
1148
1149        type CustomResult<T> = Result<T, CustomError>;
1150
1151        #[test]
1152        fn test_query_and_then() -> Result<()> {
1153            let db = checked_memory_handle();
1154            let sql = "BEGIN;
1155                       CREATE TABLE foo(x INTEGER, y TEXT);
1156                       INSERT INTO foo VALUES(4, 'hello');
1157                       INSERT INTO foo VALUES(3, ', ');
1158                       INSERT INTO foo VALUES(2, 'world');
1159                       INSERT INTO foo VALUES(1, '!');
1160                       END;";
1161            db.execute_batch(sql)?;
1162
1163            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1164            let results: Result<Vec<String>> = query.query_and_then([], |row| row.get(1))?.collect();
1165
1166            assert_eq!(results?.concat(), "hello, world!");
1167            Ok(())
1168        }
1169
1170        #[test]
1171        fn test_query_and_then_fails() -> Result<()> {
1172            let db = checked_memory_handle();
1173            let sql = "BEGIN;
1174                       CREATE TABLE foo(x INTEGER, y TEXT);
1175                       INSERT INTO foo VALUES(4, 'hello');
1176                       INSERT INTO foo VALUES(3, ', ');
1177                       INSERT INTO foo VALUES(2, 'world');
1178                       INSERT INTO foo VALUES(1, '!');
1179                       END;";
1180            db.execute_batch(sql)?;
1181
1182            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1183            let bad_type: Result<Vec<f64>> = query.query_and_then([], |row| row.get(1))?.collect();
1184
1185            match bad_type.unwrap_err() {
1186                Error::InvalidColumnType(..) => (),
1187                err => panic!("Unexpected error {err}"),
1188            }
1189
1190            let bad_idx: Result<Vec<String>> = query.query_and_then([], |row| row.get(3))?.collect();
1191
1192            match bad_idx.unwrap_err() {
1193                Error::InvalidColumnIndex(_) => (),
1194                err => panic!("Unexpected error {err}"),
1195            }
1196            Ok(())
1197        }
1198
1199        #[test]
1200        fn test_query_and_then_custom_error() -> CustomResult<()> {
1201            let db = checked_memory_handle();
1202            let sql = "BEGIN;
1203                       CREATE TABLE foo(x INTEGER, y TEXT);
1204                       INSERT INTO foo VALUES(4, 'hello');
1205                       INSERT INTO foo VALUES(3, ', ');
1206                       INSERT INTO foo VALUES(2, 'world');
1207                       INSERT INTO foo VALUES(1, '!');
1208                       END;";
1209            db.execute_batch(sql)?;
1210
1211            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1212            let results: CustomResult<Vec<String>> = query
1213                .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1214                .collect();
1215
1216            assert_eq!(results?.concat(), "hello, world!");
1217            Ok(())
1218        }
1219
1220        #[test]
1221        fn test_query_and_then_custom_error_fails() -> Result<()> {
1222            let db = checked_memory_handle();
1223            let sql = "BEGIN;
1224                       CREATE TABLE foo(x INTEGER, y TEXT);
1225                       INSERT INTO foo VALUES(4, 'hello');
1226                       INSERT INTO foo VALUES(3, ', ');
1227                       INSERT INTO foo VALUES(2, 'world');
1228                       INSERT INTO foo VALUES(1, '!');
1229                       END;";
1230            db.execute_batch(sql)?;
1231
1232            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1233            let bad_type: CustomResult<Vec<f64>> = query
1234                .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1235                .collect();
1236
1237            match bad_type.unwrap_err() {
1238                CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1239                err => panic!("Unexpected error {err}"),
1240            }
1241
1242            let bad_idx: CustomResult<Vec<String>> = query
1243                .query_and_then([], |row| row.get(3).map_err(CustomError::Sqlite))?
1244                .collect();
1245
1246            match bad_idx.unwrap_err() {
1247                CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1248                err => panic!("Unexpected error {err}"),
1249            }
1250
1251            let non_sqlite_err: CustomResult<Vec<String>> =
1252                query.query_and_then([], |_| Err(CustomError::SomeError))?.collect();
1253
1254            match non_sqlite_err.unwrap_err() {
1255                CustomError::SomeError => (),
1256                err => panic!("Unexpected error {err}"),
1257            }
1258            Ok(())
1259        }
1260
1261        #[test]
1262        fn test_query_row_and_then_custom_error() -> CustomResult<()> {
1263            let db = checked_memory_handle();
1264            let sql = "BEGIN;
1265                       CREATE TABLE foo(x INTEGER, y TEXT);
1266                       INSERT INTO foo VALUES(4, 'hello');
1267                       END;";
1268            db.execute_batch(sql)?;
1269
1270            let query = "SELECT x, y FROM foo ORDER BY x DESC";
1271            let results: CustomResult<String> =
1272                db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1273
1274            assert_eq!(results?, "hello");
1275            Ok(())
1276        }
1277
1278        #[test]
1279        fn test_query_row_and_then_custom_error_fails() -> Result<()> {
1280            let db = checked_memory_handle();
1281            let sql = "BEGIN;
1282                       CREATE TABLE foo(x INTEGER, y TEXT);
1283                       INSERT INTO foo VALUES(4, 'hello');
1284                       END;";
1285            db.execute_batch(sql)?;
1286
1287            let query = "SELECT x, y FROM foo ORDER BY x DESC";
1288            let bad_type: CustomResult<f64> =
1289                db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1290
1291            match bad_type.unwrap_err() {
1292                CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1293                err => panic!("Unexpected error {err}"),
1294            }
1295
1296            let bad_idx: CustomResult<String> =
1297                db.query_row_and_then(query, [], |row| row.get(3).map_err(CustomError::Sqlite));
1298
1299            match bad_idx.unwrap_err() {
1300                CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1301                err => panic!("Unexpected error {err}"),
1302            }
1303
1304            let non_sqlite_err: CustomResult<String> =
1305                db.query_row_and_then(query, [], |_| Err(CustomError::SomeError));
1306
1307            match non_sqlite_err.unwrap_err() {
1308                CustomError::SomeError => (),
1309                err => panic!("Unexpected error {err}"),
1310            }
1311            Ok(())
1312        }
1313
1314        #[test]
1315        fn test_rows_and_then_with_custom_error() -> Result<()> {
1316            let db = checked_memory_handle();
1317            db.execute_batch("CREATE TABLE test (value INTEGER)")?;
1318            db.execute_batch("INSERT INTO test VALUES (1), (3), (5)")?;
1319
1320            let mut stmt = db.prepare("SELECT value FROM test ORDER BY value")?;
1321            let rows = stmt.query([])?;
1322
1323            // Use and_then to apply custom validation with custom error type
1324            let results: Vec<i32> = rows
1325                .and_then(|row| -> CustomResult<i32> {
1326                    let val: i32 = row.get(0)?; // duckdb::Error automatically converted via From trait
1327                    if val > 10 {
1328                        Err(CustomError::SomeError) // Custom application-specific error
1329                    } else {
1330                        Ok(val)
1331                    }
1332                })
1333                .collect::<CustomResult<Vec<_>>>()
1334                .unwrap();
1335
1336            assert_eq!(results, vec![1, 3, 5]);
1337            Ok(())
1338        }
1339    }
1340
1341    #[test]
1342    fn test_dynamic() -> Result<()> {
1343        let db = checked_memory_handle();
1344        let sql = "BEGIN;
1345                       CREATE TABLE foo(x INTEGER, y TEXT);
1346                       INSERT INTO foo VALUES(4, 'hello');
1347                       END;";
1348        db.execute_batch(sql)?;
1349
1350        db.query_row("SELECT * FROM foo", [], |r| {
1351            assert_eq!(2, r.as_ref().column_count());
1352            Ok(())
1353        })
1354    }
1355    #[test]
1356    fn test_dyn_box() -> Result<()> {
1357        let db = checked_memory_handle();
1358        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1359        let b: Box<dyn ToSql> = Box::new(5);
1360        db.execute("INSERT INTO foo VALUES(?)", [b])?;
1361        db.query_row("SELECT x FROM foo", [], |r| {
1362            assert_eq!(5, r.get_unwrap::<_, i32>(0));
1363            Ok(())
1364        })
1365    }
1366
1367    #[test]
1368    fn test_alter_table() -> Result<()> {
1369        let db = checked_memory_handle();
1370        db.execute_batch("CREATE TABLE x(t INTEGER);")?;
1371        // `execute_batch` should be used but `execute` should also work
1372        db.execute("ALTER TABLE x RENAME TO y;", [])?;
1373        Ok(())
1374    }
1375
1376    #[test]
1377    fn test_query_arrow_record_batch_small() -> Result<()> {
1378        let db = checked_memory_handle();
1379        let sql = "BEGIN TRANSACTION;
1380                   CREATE TABLE test(t INTEGER);
1381                   INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);
1382                   END TRANSACTION;";
1383        db.execute_batch(sql)?;
1384        let mut stmt = db.prepare("select t from test order by t desc")?;
1385        let mut arr = stmt.query_arrow([])?;
1386
1387        let schema = arr.get_schema();
1388        assert_eq!(schema.fields().len(), 1);
1389        assert_eq!(schema.field(0).name(), "t");
1390        assert_eq!(schema.field(0).data_type(), &DataType::Int32);
1391
1392        let rb = arr.next().unwrap();
1393        let column = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1394        assert_eq!(column.len(), 5);
1395        assert_eq!(column.value(0), 5);
1396        assert_eq!(column.value(1), 4);
1397        assert_eq!(column.value(2), 3);
1398        assert_eq!(column.value(3), 2);
1399        assert_eq!(column.value(4), 1);
1400
1401        assert!(arr.next().is_none());
1402        Ok(())
1403    }
1404
1405    #[test]
1406    fn test_query_arrow_record_batch_large() -> Result<()> {
1407        let db = checked_memory_handle();
1408        db.execute_batch("BEGIN TRANSACTION")?;
1409        db.execute_batch("CREATE TABLE test(t INTEGER);")?;
1410        for _ in 0..600 {
1411            db.execute_batch("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);")?;
1412        }
1413        db.execute_batch("END TRANSACTION")?;
1414        let rbs: Vec<RecordBatch> = db.prepare("select t from test order by t")?.query_arrow([])?.collect();
1415        // batch size is not stable
1416        // assert_eq!(rbs.len(), 3);
1417        assert_eq!(rbs.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3000);
1418        assert_eq!(
1419            rbs.iter()
1420                .map(|rb| rb
1421                    .column(0)
1422                    .as_any()
1423                    .downcast_ref::<Int32Array>()
1424                    .unwrap()
1425                    .iter()
1426                    .map(|i| i.unwrap())
1427                    .sum::<i32>())
1428                .sum::<i32>(),
1429            9000
1430        );
1431        Ok(())
1432    }
1433
1434    #[test]
1435    fn round_trip_interval() -> Result<()> {
1436        let db = checked_memory_handle();
1437        db.execute_batch("CREATE TABLE foo (t INTERVAL);")?;
1438
1439        let d = Value::Interval {
1440            months: 1,
1441            days: 2,
1442            nanos: 3,
1443        };
1444        db.execute("INSERT INTO foo VALUES (?)", [d])?;
1445
1446        let mut stmt = db.prepare("SELECT t FROM foo")?;
1447        let mut rows = stmt.query([])?;
1448        let row = rows.next()?.unwrap();
1449        let d: Value = row.get_unwrap(0);
1450        assert_eq!(d, d);
1451        Ok(())
1452    }
1453
1454    #[test]
1455    fn test_database_name_to_string() -> Result<()> {
1456        assert_eq!(DatabaseName::Main.to_string(), "main");
1457        assert_eq!(DatabaseName::Temp.to_string(), "temp");
1458        assert_eq!(DatabaseName::Attached("abc").to_string(), "abc");
1459        Ok(())
1460    }
1461
1462    #[test]
1463    fn test_interrupt() -> Result<()> {
1464        let db = checked_memory_handle();
1465        let db_interrupt = db.interrupt_handle();
1466
1467        let (tx, rx) = std::sync::mpsc::channel();
1468        std::thread::spawn(move || {
1469            let mut stmt = db
1470                .prepare("select count(*) from range(10000000) t1, range(1000000) t2")
1471                .unwrap();
1472            tx.send(stmt.execute([])).unwrap();
1473        });
1474
1475        std::thread::sleep(std::time::Duration::from_millis(100));
1476        db_interrupt.interrupt();
1477
1478        let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
1479        assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
1480        Ok(())
1481    }
1482
1483    #[test]
1484    fn test_interrupt_on_dropped_db() {
1485        let db = checked_memory_handle();
1486        let db_interrupt = db.interrupt_handle();
1487
1488        drop(db);
1489        db_interrupt.interrupt();
1490    }
1491
1492    #[cfg(feature = "bundled")]
1493    #[test]
1494    fn test_version() -> Result<()> {
1495        let db = checked_memory_handle();
1496        let expected: String = format!("v{}", env!("CARGO_PKG_VERSION"));
1497        let actual = db.version()?;
1498        assert_eq!(expected, actual);
1499        Ok(())
1500    }
1501
1502    #[test]
1503    fn test_arrow_string_view_setting() -> Result<()> {
1504        // Test that only one setting doesn't work (missing arrow_output_version)
1505        {
1506            let config = Config::default().with("produce_arrow_string_view", "true")?;
1507            let conn = Connection::open_in_memory_with_flags(config)?;
1508
1509            let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1510            let arrow = query.query_arrow([])?;
1511
1512            let batch = arrow.into_iter().next().expect("Expected at least one batch");
1513            assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8);
1514        }
1515
1516        {
1517            let config = Config::default()
1518                .with("produce_arrow_string_view", "true")?
1519                .with("arrow_output_version", "1.4")?;
1520            let conn = Connection::open_in_memory_with_flags(config)?;
1521
1522            let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1523            let arrow = query.query_arrow([])?;
1524
1525            let batch = arrow.into_iter().next().expect("Expected at least one batch");
1526            assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8View);
1527        }
1528
1529        Ok(())
1530    }
1531
1532    #[test]
1533    fn test_prepare_multi_statement() -> Result<()> {
1534        let db = checked_memory_handle();
1535
1536        {
1537            let mut stmt =
1538                db.prepare("CREATE TABLE test(x INTEGER); INSERT INTO test VALUES (42); SELECT x FROM test;")?;
1539            let result: i32 = stmt.query_row([], |row| row.get(0))?;
1540            assert_eq!(result, 42);
1541        }
1542
1543        {
1544            let mut stmt = db.prepare(
1545                "CREATE TEMP TABLE temp_data(id INTEGER, value TEXT);
1546                INSERT INTO temp_data VALUES (1, 'first'), (2, 'second');
1547                SELECT COUNT(*) FROM temp_data;",
1548            )?;
1549            let count: i32 = stmt.query_row([], |row| row.get(0))?;
1550            assert_eq!(count, 2);
1551        }
1552
1553        Ok(())
1554    }
1555
1556    #[test]
1557    fn test_pivot_query() -> Result<()> {
1558        let db = checked_memory_handle();
1559
1560        db.execute_batch(
1561            "CREATE TABLE cities(city VARCHAR, year INTEGER, population INTEGER);
1562             INSERT INTO cities VALUES
1563               ('Amsterdam', 2000, 1005),
1564               ('Amsterdam', 2010, 1065),
1565               ('Amsterdam', 2020, 1158),
1566               ('Berlin', 2000, 3382),
1567               ('Berlin', 2010, 3460),
1568               ('Berlin', 2020, 3576);",
1569        )?;
1570
1571        // PIVOT queries internally expand to multiple statements
1572        let mut stmt = db.prepare("PIVOT cities ON year USING sum(population);")?;
1573        let mut rows = stmt.query([])?;
1574
1575        let mut row_count = 0;
1576        while let Some(_row) = rows.next()? {
1577            row_count += 1;
1578        }
1579        assert_eq!(row_count, 2);
1580
1581        Ok(())
1582    }
1583
1584    #[test]
1585    fn test_multiple_memory_databases() -> Result<()> {
1586        // Unnamed :memory: connections are isolated
1587        {
1588            let mem1 = Connection::open_in_memory()?;
1589            let mem2 = Connection::open_in_memory()?;
1590
1591            mem1.execute_batch("CREATE TABLE test (id INTEGER)")?;
1592            mem1.execute("INSERT INTO test VALUES (1)", [])?;
1593
1594            mem2.execute_batch("CREATE TABLE test (id INTEGER)")?;
1595            mem2.execute("INSERT INTO test VALUES (2)", [])?;
1596
1597            let value1: i32 = mem1.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1598            assert_eq!(value1, 1);
1599
1600            let value2: i32 = mem2.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1601            assert_eq!(value2, 2);
1602        }
1603
1604        // try_clone() shares the same database
1605        {
1606            let shared = Connection::open_in_memory()?;
1607
1608            shared.execute_batch("CREATE TABLE shared_table (id INTEGER)")?;
1609            shared.execute("INSERT INTO shared_table VALUES (123)", [])?;
1610
1611            let cloned = shared.try_clone()?;
1612
1613            // Cloned connection can see the original's tables
1614            let value: i32 = cloned.query_row("SELECT id FROM shared_table", [], |r| r.get(0))?;
1615            assert_eq!(value, 123);
1616
1617            cloned.execute("INSERT INTO shared_table VALUES (456)", [])?;
1618
1619            // Original connection can see cloned's insert
1620            let count: i64 = shared.query_row("SELECT COUNT(*) FROM shared_table", [], |r| r.get(0))?;
1621            assert_eq!(count, 2);
1622        }
1623
1624        Ok(())
1625    }
1626
1627    #[test]
1628    fn test_appender_with_catalog() -> Result<()> {
1629        let db = checked_memory_handle();
1630
1631        // Attach a new database to use as a catalog
1632        let temp_dir = tempfile::tempdir().unwrap();
1633        let attached_path = temp_dir.path().join("attached.db");
1634        db.execute_batch(&format!("ATTACH '{}' AS attached_db", attached_path.display()))?;
1635
1636        // Create a table in the attached database
1637        db.execute_batch("CREATE TABLE attached_db.main.test_table (id INTEGER, name TEXT)")?;
1638
1639        // Use appender with catalog
1640        {
1641            let mut app = db.appender_to_catalog_and_db("test_table", "attached_db", "main")?;
1642            app.append_row(params![1, "Alice"])?;
1643            app.append_row(params![2, "Bob"])?;
1644            app.append_row(params![3, "Charlie"])?;
1645        }
1646
1647        // Verify data was inserted into the correct table
1648        let count: i64 = db.query_row("SELECT COUNT(*) FROM attached_db.main.test_table", [], |r| r.get(0))?;
1649        assert_eq!(count, 3);
1650
1651        let name: String = db.query_row("SELECT name FROM attached_db.main.test_table WHERE id = ?", [2], |r| {
1652            r.get(0)
1653        })?;
1654        assert_eq!(name, "Bob");
1655
1656        Ok(())
1657    }
1658
1659    #[test]
1660    fn test_appender_with_catalog_multiple_schemas() -> Result<()> {
1661        let db = checked_memory_handle();
1662
1663        // Attach a new database
1664        let temp_dir = tempfile::tempdir().unwrap();
1665        let attached_path = temp_dir.path().join("multi_schema.db");
1666        db.execute_batch(&format!("ATTACH '{}' AS my_catalog", attached_path.display()))?;
1667
1668        // Create multiple schemas and tables
1669        db.execute_batch("CREATE SCHEMA my_catalog.schema1")?;
1670        db.execute_batch("CREATE SCHEMA my_catalog.schema2")?;
1671        db.execute_batch("CREATE TABLE my_catalog.schema1.data (value INTEGER)")?;
1672        db.execute_batch("CREATE TABLE my_catalog.schema2.data (value INTEGER)")?;
1673
1674        // Append to schema1
1675        {
1676            let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema1")?;
1677            app.append_rows([[10], [20], [30]])?;
1678        }
1679
1680        // Append to schema2
1681        {
1682            let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema2")?;
1683            app.append_rows([[100], [200]])?;
1684        }
1685
1686        // Verify data in schema1
1687        let sum1: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema1.data", [], |r| r.get(0))?;
1688        assert_eq!(sum1, 60);
1689
1690        // Verify data in schema2
1691        let sum2: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema2.data", [], |r| r.get(0))?;
1692        assert_eq!(sum2, 300);
1693
1694        Ok(())
1695    }
1696
1697    #[test]
1698    fn test_appender_with_catalog_main_vs_attached() -> Result<()> {
1699        let db = checked_memory_handle();
1700
1701        // Create table in main database
1702        db.execute_batch("CREATE TABLE test (id INTEGER)")?;
1703
1704        // Attach another database
1705        let temp_dir = tempfile::tempdir().unwrap();
1706        let attached_path = temp_dir.path().join("other.db");
1707        db.execute_batch(&format!("ATTACH '{}' AS other_db", attached_path.display()))?;
1708        db.execute_batch("CREATE TABLE other_db.main.test (id INTEGER)")?;
1709
1710        // Append to main catalog (memory)
1711        {
1712            let mut app = db.appender_to_catalog_and_db("test", "memory", "main")?;
1713            app.append_rows([[1], [2]])?;
1714        }
1715
1716        // Append to attached catalog
1717        {
1718            let mut app = db.appender_to_catalog_and_db("test", "other_db", "main")?;
1719            app.append_rows([[100], [200]])?;
1720        }
1721
1722        // Verify main database
1723        let count_main: i64 = db.query_row("SELECT COUNT(*) FROM test", [], |r| r.get(0))?;
1724        assert_eq!(count_main, 2);
1725
1726        // Verify attached database
1727        let count_attached: i64 = db.query_row("SELECT COUNT(*) FROM other_db.main.test", [], |r| r.get(0))?;
1728        assert_eq!(count_attached, 2);
1729
1730        Ok(())
1731    }
1732
1733    #[test]
1734    fn test_appender_with_catalog_error_invalid_catalog() -> Result<()> {
1735        let db = checked_memory_handle();
1736
1737        // Try to create appender with non-existent catalog
1738        let result = db.appender_to_catalog_and_db("test", "nonexistent_catalog", "main");
1739        assert!(result.is_err());
1740
1741        Ok(())
1742    }
1743
1744    #[test]
1745    fn test_appender_with_catalog_error_invalid_schema() -> Result<()> {
1746        let db = checked_memory_handle();
1747
1748        // Attach a database
1749        let temp_dir = tempfile::tempdir().unwrap();
1750        let attached_path = temp_dir.path().join("test.db");
1751        db.execute_batch(&format!("ATTACH '{}' AS my_db", attached_path.display()))?;
1752
1753        db.execute_batch("CREATE TABLE my_db.main.test (id INTEGER)")?;
1754
1755        // Try to create appender with non-existent schema
1756        let result = db.appender_to_catalog_and_db("test", "my_db", "nonexistent_schema");
1757        assert!(result.is_err());
1758
1759        Ok(())
1760    }
1761
1762    #[test]
1763    fn test_appender_with_catalog_flush() -> Result<()> {
1764        let db = checked_memory_handle();
1765
1766        // Attach database
1767        let temp_dir = tempfile::tempdir().unwrap();
1768        let attached_path = temp_dir.path().join("flush_test.db");
1769        db.execute_batch(&format!("ATTACH '{}' AS flush_db", attached_path.display()))?;
1770
1771        db.execute_batch("CREATE TABLE flush_db.main.test (id INTEGER)")?;
1772
1773        // Use appender with explicit flush
1774        {
1775            let mut app = db.appender_to_catalog_and_db("test", "flush_db", "main")?;
1776            app.append_row([1])?;
1777            app.append_row([2])?;
1778            app.flush()?;
1779            app.append_row([3])?;
1780            app.flush()?;
1781        }
1782
1783        // Verify all rows were flushed
1784        let count: i64 = db.query_row("SELECT COUNT(*) FROM flush_db.main.test", [], |r| r.get(0))?;
1785        assert_eq!(count, 3);
1786
1787        Ok(())
1788    }
1789}