1#![warn(missing_docs)]
57
58pub use libduckdb_sys as ffi;
59
60use std::{
61 cell::RefCell,
62 convert,
63 ffi::CString,
64 fmt,
65 path::{Path, PathBuf},
66 result, str,
67};
68
69use crate::{cache::StatementCache, inner_connection::InnerConnection, raw_statement::RawStatement, types::ValueRef};
70
71#[cfg(feature = "r2d2")]
72pub use crate::r2d2::DuckdbConnectionManager;
73pub use crate::{
74 appender::Appender,
75 appender_params::{appender_params_from_iter, AppenderParams, AppenderParamsFromIter},
76 arrow_batch::{Arrow, ArrowStream},
77 cache::CachedStatement,
78 column::Column,
79 config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
80 error::Error,
81 ffi::ErrorCode,
82 inner_connection::InterruptHandle,
83 params::{params_from_iter, Params, ParamsFromIter},
84 row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
85 statement::Statement,
86 transaction::{DropBehavior, Transaction},
87 types::ToSql,
88};
89#[cfg(feature = "polars")]
90pub use polars_dataframe::Polars;
91
92pub use arrow;
94#[cfg(feature = "polars")]
95pub use polars;
96#[cfg(feature = "polars")]
97pub use polars_arrow as arrow2;
98
99pub mod core;
101
102#[macro_use]
103mod error;
104mod appender;
105mod appender_params;
106mod arrow_batch;
107mod cache;
108mod column;
109mod config;
110mod inner_connection;
111mod params;
112#[cfg(feature = "polars")]
113mod polars_dataframe;
114mod pragma;
115#[cfg(feature = "r2d2")]
116mod r2d2;
117mod raw_statement;
118mod row;
119mod statement;
120mod transaction;
121
122#[cfg(feature = "extensions-full")]
123mod extension;
124
125pub mod types;
126#[cfg(feature = "vtab")]
128pub mod vtab;
129
130#[cfg(feature = "vscalar")]
132pub mod vscalar;
133
134#[cfg(test)]
135mod test_all_types;
136
137const STATEMENT_CACHE_DEFAULT_CAPACITY: usize = 16;
139
140#[macro_export]
162macro_rules! params {
163 () => {
164 &[] as &[&dyn $crate::ToSql]
165 };
166 ($($param:expr),+ $(,)?) => {
167 &[$(&$param as &dyn $crate::ToSql),+] as &[&dyn $crate::ToSql]
168 };
169}
170
171pub type Result<T, E = Error> = result::Result<T, E>;
173
174pub trait OptionalExt<T> {
176 fn optional(self) -> Result<Option<T>>;
182}
183
184impl<T> OptionalExt<T> for Result<T> {
185 fn optional(self) -> Result<Option<T>> {
186 match self {
187 Ok(value) => Ok(Some(value)),
188 Err(Error::QueryReturnedNoRows) => Ok(None),
189 Err(e) => Err(e),
190 }
191 }
192}
193
194#[derive(Copy, Clone, Debug)]
196pub enum DatabaseName<'a> {
197 Main,
199
200 Temp,
202
203 Attached(&'a str),
205}
206
207#[allow(clippy::needless_lifetimes)]
208impl<'a> fmt::Display for DatabaseName<'a> {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 match *self {
211 DatabaseName::Main => write!(f, "main"),
212 DatabaseName::Temp => write!(f, "temp"),
213 DatabaseName::Attached(s) => write!(f, "{s}"),
214 }
215 }
216}
217
218pub const MAIN_DB: DatabaseName<'static> = DatabaseName::Main;
220
221pub const TEMP_DB: DatabaseName<'static> = DatabaseName::Temp;
223
224pub struct Connection {
226 db: RefCell<InnerConnection>,
227 cache: StatementCache,
228 path: Option<PathBuf>,
229}
230
231unsafe impl Send for Connection {}
232
233impl Connection {
234 #[inline]
255 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
256 Self::open_with_flags(path, Config::default())
257 }
258
259 #[inline]
265 pub fn open_in_memory() -> Result<Self> {
266 Self::open_in_memory_with_flags(Config::default())
267 }
268
269 #[inline]
278 pub unsafe fn open_from_raw(raw: ffi::duckdb_database) -> Result<Self> {
279 InnerConnection::new(raw, false).map(|db| Self {
280 db: RefCell::new(db),
281 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
282 path: None, })
284 }
285
286 #[inline]
293 pub fn open_with_flags<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
294 #[cfg(unix)]
295 fn path_to_cstring(p: &Path) -> Result<CString> {
296 use std::os::unix::ffi::OsStrExt;
297 Ok(CString::new(p.as_os_str().as_bytes())?)
298 }
299
300 #[cfg(not(unix))]
301 fn path_to_cstring(p: &Path) -> Result<CString> {
302 let s = p.to_str().ok_or_else(|| Error::InvalidPath(p.to_owned()))?;
303 Ok(CString::new(s)?)
304 }
305
306 let c_path = path_to_cstring(path.as_ref())?;
307 let config = config.with("duckdb_api", "rust").unwrap();
308 InnerConnection::open_with_flags(&c_path, config).map(|db| Self {
309 db: RefCell::new(db),
310 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
311 path: Some(path.as_ref().to_path_buf()),
312 })
313 }
314
315 #[inline]
321 pub fn open_in_memory_with_flags(config: Config) -> Result<Self> {
322 Self::open_with_flags(":memory:", config)
323 }
324
325 pub fn execute_batch(&self, sql: &str) -> Result<()> {
346 self.db.borrow_mut().execute(sql)
347 }
348
349 #[inline]
385 pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<usize> {
386 self.prepare(sql).and_then(|mut stmt| stmt.execute(params))
387 }
388
389 #[inline]
391 pub fn path(&self) -> Option<&Path> {
392 self.path.as_deref()
393 }
394
395 #[inline]
423 pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
424 where
425 P: Params,
426 F: FnOnce(&Row<'_>) -> Result<T>,
427 {
428 self.prepare(sql)?.query_row(params, f)
429 }
430
431 #[inline]
457 pub fn query_row_and_then<T, E, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, E>
458 where
459 P: Params,
460 F: FnOnce(&Row<'_>) -> Result<T, E>,
461 E: convert::From<Error>,
462 {
463 self.prepare(sql)?
464 .query(params)?
465 .get_expected_row()
466 .map_err(E::from)
467 .and_then(f)
468 }
469
470 #[inline]
489 pub fn prepare(&self, sql: &str) -> Result<Statement<'_>> {
490 self.db.borrow_mut().prepare(self, sql)
491 }
492
493 pub fn appender(&self, table: &str) -> Result<Appender<'_>> {
511 self.appender_to_db(table, &DatabaseName::Main.to_string())
512 }
513
514 pub fn appender_to_db(&self, table: &str, schema: &str) -> Result<Appender<'_>> {
531 self.db.borrow_mut().appender(self, table, schema)
532 }
533
534 pub fn appender_to_catalog_and_db(&self, table: &str, catalog: &str, schema: &str) -> Result<Appender<'_>> {
551 self.db
552 .borrow_mut()
553 .appender_to_catalog_and_db(self, table, catalog, schema)
554 }
555
556 pub fn appender_with_columns(&self, table: &str, columns: &[&str]) -> Result<Appender<'_>> {
580 self.appender_with_columns_to_db(table, &DatabaseName::Main.to_string(), columns)
581 }
582
583 pub fn appender_with_columns_to_db(&self, table: &str, schema: &str, columns: &[&str]) -> Result<Appender<'_>> {
587 self.db
588 .borrow_mut()
589 .appender_with_columns(self, table, schema, None, columns)
590 }
591
592 pub fn appender_with_columns_to_catalog_and_db(
596 &self,
597 table: &str,
598 catalog: &str,
599 schema: &str,
600 columns: &[&str],
601 ) -> Result<Appender<'_>> {
602 self.db
603 .borrow_mut()
604 .appender_with_columns(self, table, schema, Some(catalog), columns)
605 }
606
607 pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
628 self.db.borrow().get_interrupt_handle()
629 }
630
631 #[inline]
641 #[allow(clippy::result_large_err)]
642 pub fn close(self) -> Result<(), (Self, Error)> {
643 let r = self.db.borrow_mut().close();
644 r.map_err(move |err| (self, err))
645 }
646
647 #[inline]
650 pub fn is_autocommit(&self) -> bool {
651 self.db.borrow().is_autocommit()
652 }
653
654 pub fn try_clone(&self) -> Result<Self> {
656 let inner = self.db.borrow().try_clone()?;
657 Ok(Self {
658 db: RefCell::new(inner),
659 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
660 path: self.path.clone(),
661 })
662 }
663
664 pub fn version(&self) -> Result<String> {
666 self.query_row("PRAGMA version", [], |row| row.get(0))
667 }
668}
669
670impl fmt::Debug for Connection {
671 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
672 f.debug_struct("Connection").field("path", &self.path).finish()
673 }
674}
675
676#[cfg(doctest)]
677doc_comment::doctest!("../../../README.md");
678
679#[cfg(test)]
680mod test {
681 use crate::types::Value;
682
683 use super::*;
684 use std::{error::Error as StdError, fmt};
685
686 use arrow::{array::Int32Array, datatypes::DataType, record_batch::RecordBatch};
687 use fallible_iterator::FallibleIterator;
688
689 #[allow(dead_code, unconditional_recursion, clippy::extra_unused_type_parameters)]
693 fn ensure_send<T: Send>() {
694 ensure_send::<Connection>();
695 }
696
697 pub fn checked_memory_handle() -> Connection {
698 Connection::open_in_memory().unwrap()
699 }
700
701 #[test]
702 fn test_params_of_vary_types() -> Result<()> {
703 let db = checked_memory_handle();
704 let sql = "BEGIN;
705 CREATE TABLE foo(bar TEXT, qux INTEGER);
706 INSERT INTO foo VALUES ('baz', 1), ('baz', 2), ('baz', 3);
707 END;";
708 db.execute_batch(sql)?;
709
710 let changed = db.execute("UPDATE foo SET qux = ? WHERE bar = ?", params![1i32, &"baz"])?;
711 assert_eq!(changed, 3);
712 Ok(())
713 }
714
715 #[test]
716 #[cfg_attr(windows, ignore = "Windows doesn't allow concurrent writes to a file")]
717 fn test_concurrent_transactions_busy_commit() -> Result<()> {
718 let tmp = tempfile::tempdir().unwrap();
719 let path = tmp.path().join("transactions.db3");
720
721 Connection::open(&path)?.execute_batch(
722 "
723 BEGIN;
724 CREATE TABLE foo(x INTEGER);
725 INSERT INTO foo VALUES(42);
726 END;",
727 )?;
728
729 let mut db1 =
730 Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
731 let mut db2 =
732 Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
733
734 {
735 let tx1 = db1.transaction()?;
736 let tx2 = db2.transaction()?;
737
738 tx1.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
740 tx2.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
741
742 tx1.execute("INSERT INTO foo VALUES(?1)", [1])?;
743 let _ = tx2.execute("INSERT INTO foo VALUES(?1)", [2]);
744
745 let _ = tx1.commit();
746 let _ = tx2.commit();
747 }
748
749 let _ = db1.transaction().expect("commit should have closed transaction");
750 let _ = db2.transaction().expect("commit should have closed transaction");
751 Ok(())
752 }
753
754 #[test]
755 fn test_persistence() -> Result<()> {
756 let temp_dir = tempfile::tempdir().unwrap();
757 let path = temp_dir.path().join("test.db3");
758
759 {
760 let db = Connection::open(&path)?;
761 let sql = "BEGIN;
762 CREATE TABLE foo(x INTEGER);
763 INSERT INTO foo VALUES(42);
764 END;";
765 db.execute_batch(sql)?;
766 }
767
768 let path_string = path.to_str().unwrap();
769 let db = Connection::open(path_string)?;
770 let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
771
772 assert_eq!(42i64, the_answer?);
773 Ok(())
774 }
775
776 #[test]
777 fn test_open() {
778 let con = Connection::open_in_memory();
779 if con.is_err() {
780 panic!("open error {}", con.unwrap_err());
781 }
782 assert!(Connection::open_in_memory().is_ok());
783 let db = checked_memory_handle();
784 assert!(db.close().is_ok());
785 let _ = checked_memory_handle();
786 let _ = checked_memory_handle();
787 }
788
789 #[test]
790 fn test_open_from_raw() {
791 let con = Connection::open_in_memory();
792 assert!(con.is_ok());
793 let inner_con: InnerConnection = con.unwrap().db.into_inner();
794 unsafe {
795 assert!(Connection::open_from_raw(inner_con.db).is_ok());
796 }
797 }
798
799 #[test]
800 fn test_open_failure() -> Result<()> {
801 let filename = "no_such_file.db";
802 let result =
803 Connection::open_with_flags(filename, Config::default().access_mode(config::AccessMode::ReadOnly)?);
804 assert!(result.is_err());
805 let err = result.err().unwrap();
806 if let Error::DuckDBFailure(_e, Some(msg)) = err {
807 assert!(
810 msg.contains(filename),
811 "error message '{msg}' does not contain '{filename}'"
812 );
813 } else {
814 panic!("DuckDBFailure expected");
815 }
816 Ok(())
817 }
818
819 #[cfg(unix)]
820 #[test]
821 fn test_invalid_unicode_file_names() -> Result<()> {
822 use std::{ffi::OsStr, fs::File, os::unix::ffi::OsStrExt};
823 let temp_dir = tempfile::tempdir().unwrap();
824
825 let path = temp_dir.path();
826 if File::create(path.join(OsStr::from_bytes(&[0xFE]))).is_err() {
827 return Ok(());
829 }
830 let db_path = path.join(OsStr::from_bytes(&[0xFF]));
831 {
832 let db = Connection::open(&db_path)?;
833 let sql = "BEGIN;
834 CREATE TABLE foo(x INTEGER);
835 INSERT INTO foo VALUES(42);
836 END;";
837 db.execute_batch(sql)?;
838 }
839
840 let db = Connection::open(&db_path)?;
841 let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
842
843 assert_eq!(42i64, the_answer?);
844 Ok(())
845 }
846
847 #[test]
848 fn test_close_always_ok() -> Result<()> {
849 let db = checked_memory_handle();
850
851 db.close().unwrap();
854 Ok(())
855 }
856
857 #[test]
858 fn test_execute_batch() -> Result<()> {
859 let db = checked_memory_handle();
860 let sql = "BEGIN;
861 CREATE TABLE foo(x INTEGER);
862 INSERT INTO foo VALUES(1);
863 INSERT INTO foo VALUES(2);
864 INSERT INTO foo VALUES(3);
865 INSERT INTO foo VALUES(4);
866 END;";
867 db.execute_batch(sql)?;
868
869 db.execute_batch("UPDATE foo SET x = 3 WHERE x < 3")?;
870
871 assert!(db.execute_batch("INVALID SQL").is_err());
872 Ok(())
873 }
874
875 #[test]
876 fn test_execute_single() -> Result<()> {
877 let db = checked_memory_handle();
878 db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
879
880 assert_eq!(
881 3,
882 db.execute("INSERT INTO foo(x) VALUES (?), (?), (?)", [1i32, 2i32, 3i32])?
883 );
884 assert_eq!(1, db.execute("INSERT INTO foo(x) VALUES (?)", [4i32])?);
885
886 assert_eq!(
887 10i32,
888 db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
889 );
890 Ok(())
891 }
892
893 #[test]
894 fn test_prepare_column_names() -> Result<()> {
895 let db = checked_memory_handle();
896 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
897
898 let mut stmt = db.prepare("SELECT * FROM foo")?;
899 stmt.execute([])?;
900 assert_eq!(stmt.column_count(), 1);
901 assert_eq!(stmt.column_names(), vec!["x"]);
902
903 let mut stmt = db.prepare("SELECT x AS a, x AS b FROM foo")?;
904 stmt.execute([])?;
905 assert_eq!(stmt.column_count(), 2);
906 assert_eq!(stmt.column_names(), vec!["a", "b"]);
907 Ok(())
908 }
909
910 #[test]
911 fn test_prepare_execute() -> Result<()> {
912 let db = checked_memory_handle();
913 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
914
915 let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
916 assert_eq!(insert_stmt.execute([1i32])?, 1);
917 assert_eq!(insert_stmt.execute([2i32])?, 1);
918 assert_eq!(insert_stmt.execute([3i32])?, 1);
919
920 assert!(insert_stmt.execute(["hello"]).is_err());
921 let mut update_stmt = db.prepare("UPDATE foo SET x=? WHERE x<?")?;
926 assert_eq!(update_stmt.execute([3i32, 3i32])?, 2);
927 assert_eq!(update_stmt.execute([3i32, 3i32])?, 0);
928 assert_eq!(update_stmt.execute([8i32, 8i32])?, 3);
929 Ok(())
930 }
931
932 #[test]
933 fn test_prepare_query() -> Result<()> {
934 let db = checked_memory_handle();
935 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
936
937 let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
938 assert_eq!(insert_stmt.execute([1i32])?, 1);
939 assert_eq!(insert_stmt.execute([2i32])?, 1);
940 assert_eq!(insert_stmt.execute([3i32])?, 1);
941
942 let mut query = db.prepare("SELECT x FROM foo WHERE x < ? ORDER BY x DESC")?;
943 {
944 let mut rows = query.query([4i32])?;
945 let mut v = Vec::<i32>::new();
946
947 while let Some(row) = rows.next()? {
948 v.push(row.get(0)?);
949 }
950
951 assert_eq!(v, [3i32, 2, 1]);
952 }
953
954 {
955 let mut rows = query.query([3i32])?;
956 let mut v = Vec::<i32>::new();
957
958 while let Some(row) = rows.next()? {
959 v.push(row.get(0)?);
960 }
961
962 assert_eq!(v, [2i32, 1]);
963 }
964 Ok(())
965 }
966
967 #[test]
968 fn test_query_map() -> Result<()> {
969 let db = checked_memory_handle();
970 let sql = "BEGIN;
971 CREATE TABLE foo(x INTEGER, y TEXT);
972 INSERT INTO foo VALUES(4, 'hello');
973 INSERT INTO foo VALUES(3, ', ');
974 INSERT INTO foo VALUES(2, 'world');
975 INSERT INTO foo VALUES(1, '!');
976 END;";
977 db.execute_batch(sql)?;
978
979 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
980 let results: Result<Vec<String>> = query.query([])?.map(|row| row.get(1)).collect();
981
982 assert_eq!(results?.concat(), "hello, world!");
983 Ok(())
984 }
985
986 #[test]
987 fn test_query_row() -> Result<()> {
988 let db = checked_memory_handle();
989 let sql = "BEGIN;
990 CREATE TABLE foo(x INTEGER);
991 INSERT INTO foo VALUES(1);
992 INSERT INTO foo VALUES(2);
993 INSERT INTO foo VALUES(3);
994 INSERT INTO foo VALUES(4);
995 END;";
996 db.execute_batch(sql)?;
997
998 assert_eq!(
999 10i64,
1000 db.query_row::<i64, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
1001 );
1002
1003 let result: Result<i64> = db.query_row("SELECT x FROM foo WHERE x > 5", [], |r| r.get(0));
1004 match result.unwrap_err() {
1005 Error::QueryReturnedNoRows => (),
1006 err => panic!("Unexpected error {err}"),
1007 }
1008
1009 let bad_query_result = db.query_row("NOT A PROPER QUERY; test123", [], |_| Ok(()));
1010
1011 assert!(bad_query_result.is_err());
1012 Ok(())
1013 }
1014
1015 #[test]
1016 fn test_optional() -> Result<()> {
1017 let db = checked_memory_handle();
1018
1019 let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 <> 0", [], |r| r.get(0));
1020 let result = result.optional();
1021 match result? {
1022 None => (),
1023 _ => panic!("Unexpected result"),
1024 }
1025
1026 let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 == 0", [], |r| r.get(0));
1027 let result = result.optional();
1028 match result? {
1029 Some(1) => (),
1030 _ => panic!("Unexpected result"),
1031 }
1032
1033 let bad_query_result: Result<i64> = db.query_row("NOT A PROPER QUERY", [], |r| r.get(0));
1034 let bad_query_result = bad_query_result.optional();
1035 assert!(bad_query_result.is_err());
1036 Ok(())
1037 }
1038
1039 #[test]
1040 fn test_prepare_failures() -> Result<()> {
1041 let db = checked_memory_handle();
1042 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1043
1044 let _ = db.prepare("SELECT * FROM does_not_exist").unwrap_err();
1045 Ok(())
1047 }
1048
1049 #[test]
1050 fn test_is_autocommit() {
1051 let db = checked_memory_handle();
1052 assert!(db.is_autocommit(), "autocommit expected to be active by default");
1053 }
1054
1055 #[test]
1056 #[should_panic(expected = "not supported")]
1057 fn test_statement_debugging() {
1058 let db = checked_memory_handle();
1059 let query = "SELECT 12345";
1060 let stmt = db.prepare(query).unwrap();
1061
1062 assert!(format!("{stmt:?}").contains(query));
1063 }
1064
1065 #[test]
1066 fn test_notnull_constraint_error() -> Result<()> {
1067 let db = checked_memory_handle();
1068 db.execute_batch("CREATE TABLE foo(x TEXT NOT NULL)")?;
1069
1070 let result = db.execute("INSERT INTO foo (x) VALUES (NULL)", []);
1071 assert!(result.is_err());
1072
1073 match result.unwrap_err() {
1074 Error::DuckDBFailure(err, _) => {
1075 assert_eq!(err.code, ErrorCode::Unknown);
1077 }
1078 err => panic!("Unexpected error {err}"),
1079 }
1080 Ok(())
1081 }
1082
1083 #[test]
1084 fn test_clone() -> Result<()> {
1085 {
1087 let owned_con = checked_memory_handle();
1088 {
1089 let cloned_con = owned_con.try_clone().unwrap();
1090 cloned_con.execute_batch("create table test (c1 bigint)")?;
1091 cloned_con.close().unwrap();
1092 }
1093 owned_con.execute_batch("create table test2 (c1 bigint)")?;
1094 owned_con.close().unwrap();
1095 }
1096
1097 {
1099 let cloned_con = {
1100 let owned_con = checked_memory_handle();
1101 let clone = owned_con.try_clone().unwrap();
1102 owned_con.execute_batch("create table test (c1 bigint)")?;
1103 owned_con.close().unwrap();
1104 clone
1105 };
1106 cloned_con.execute_batch("create table test2 (c1 bigint)")?;
1107 cloned_con.close().unwrap();
1108 }
1109 Ok(())
1110 }
1111
1112 mod query_and_then_tests {
1113 use super::*;
1114
1115 #[derive(Debug)]
1116 enum CustomError {
1117 SomeError,
1118 Sqlite(Error),
1119 }
1120
1121 impl fmt::Display for CustomError {
1122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1123 match *self {
1124 Self::SomeError => write!(f, "my custom error"),
1125 Self::Sqlite(ref se) => write!(f, "my custom error: {se}"),
1126 }
1127 }
1128 }
1129
1130 impl StdError for CustomError {
1131 fn description(&self) -> &str {
1132 "my custom error"
1133 }
1134
1135 fn cause(&self) -> Option<&dyn StdError> {
1136 match *self {
1137 Self::SomeError => None,
1138 Self::Sqlite(ref se) => Some(se),
1139 }
1140 }
1141 }
1142
1143 impl From<Error> for CustomError {
1144 fn from(se: Error) -> Self {
1145 Self::Sqlite(se)
1146 }
1147 }
1148
1149 type CustomResult<T> = Result<T, CustomError>;
1150
1151 #[test]
1152 fn test_query_and_then() -> Result<()> {
1153 let db = checked_memory_handle();
1154 let sql = "BEGIN;
1155 CREATE TABLE foo(x INTEGER, y TEXT);
1156 INSERT INTO foo VALUES(4, 'hello');
1157 INSERT INTO foo VALUES(3, ', ');
1158 INSERT INTO foo VALUES(2, 'world');
1159 INSERT INTO foo VALUES(1, '!');
1160 END;";
1161 db.execute_batch(sql)?;
1162
1163 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1164 let results: Result<Vec<String>> = query.query_and_then([], |row| row.get(1))?.collect();
1165
1166 assert_eq!(results?.concat(), "hello, world!");
1167 Ok(())
1168 }
1169
1170 #[test]
1171 fn test_query_and_then_fails() -> Result<()> {
1172 let db = checked_memory_handle();
1173 let sql = "BEGIN;
1174 CREATE TABLE foo(x INTEGER, y TEXT);
1175 INSERT INTO foo VALUES(4, 'hello');
1176 INSERT INTO foo VALUES(3, ', ');
1177 INSERT INTO foo VALUES(2, 'world');
1178 INSERT INTO foo VALUES(1, '!');
1179 END;";
1180 db.execute_batch(sql)?;
1181
1182 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1183 let bad_type: Result<Vec<f64>> = query.query_and_then([], |row| row.get(1))?.collect();
1184
1185 match bad_type.unwrap_err() {
1186 Error::InvalidColumnType(..) => (),
1187 err => panic!("Unexpected error {err}"),
1188 }
1189
1190 let bad_idx: Result<Vec<String>> = query.query_and_then([], |row| row.get(3))?.collect();
1191
1192 match bad_idx.unwrap_err() {
1193 Error::InvalidColumnIndex(_) => (),
1194 err => panic!("Unexpected error {err}"),
1195 }
1196 Ok(())
1197 }
1198
1199 #[test]
1200 fn test_query_and_then_custom_error() -> CustomResult<()> {
1201 let db = checked_memory_handle();
1202 let sql = "BEGIN;
1203 CREATE TABLE foo(x INTEGER, y TEXT);
1204 INSERT INTO foo VALUES(4, 'hello');
1205 INSERT INTO foo VALUES(3, ', ');
1206 INSERT INTO foo VALUES(2, 'world');
1207 INSERT INTO foo VALUES(1, '!');
1208 END;";
1209 db.execute_batch(sql)?;
1210
1211 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1212 let results: CustomResult<Vec<String>> = query
1213 .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1214 .collect();
1215
1216 assert_eq!(results?.concat(), "hello, world!");
1217 Ok(())
1218 }
1219
1220 #[test]
1221 fn test_query_and_then_custom_error_fails() -> Result<()> {
1222 let db = checked_memory_handle();
1223 let sql = "BEGIN;
1224 CREATE TABLE foo(x INTEGER, y TEXT);
1225 INSERT INTO foo VALUES(4, 'hello');
1226 INSERT INTO foo VALUES(3, ', ');
1227 INSERT INTO foo VALUES(2, 'world');
1228 INSERT INTO foo VALUES(1, '!');
1229 END;";
1230 db.execute_batch(sql)?;
1231
1232 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1233 let bad_type: CustomResult<Vec<f64>> = query
1234 .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1235 .collect();
1236
1237 match bad_type.unwrap_err() {
1238 CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1239 err => panic!("Unexpected error {err}"),
1240 }
1241
1242 let bad_idx: CustomResult<Vec<String>> = query
1243 .query_and_then([], |row| row.get(3).map_err(CustomError::Sqlite))?
1244 .collect();
1245
1246 match bad_idx.unwrap_err() {
1247 CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1248 err => panic!("Unexpected error {err}"),
1249 }
1250
1251 let non_sqlite_err: CustomResult<Vec<String>> =
1252 query.query_and_then([], |_| Err(CustomError::SomeError))?.collect();
1253
1254 match non_sqlite_err.unwrap_err() {
1255 CustomError::SomeError => (),
1256 err => panic!("Unexpected error {err}"),
1257 }
1258 Ok(())
1259 }
1260
1261 #[test]
1262 fn test_query_row_and_then_custom_error() -> CustomResult<()> {
1263 let db = checked_memory_handle();
1264 let sql = "BEGIN;
1265 CREATE TABLE foo(x INTEGER, y TEXT);
1266 INSERT INTO foo VALUES(4, 'hello');
1267 END;";
1268 db.execute_batch(sql)?;
1269
1270 let query = "SELECT x, y FROM foo ORDER BY x DESC";
1271 let results: CustomResult<String> =
1272 db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1273
1274 assert_eq!(results?, "hello");
1275 Ok(())
1276 }
1277
1278 #[test]
1279 fn test_query_row_and_then_custom_error_fails() -> Result<()> {
1280 let db = checked_memory_handle();
1281 let sql = "BEGIN;
1282 CREATE TABLE foo(x INTEGER, y TEXT);
1283 INSERT INTO foo VALUES(4, 'hello');
1284 END;";
1285 db.execute_batch(sql)?;
1286
1287 let query = "SELECT x, y FROM foo ORDER BY x DESC";
1288 let bad_type: CustomResult<f64> =
1289 db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1290
1291 match bad_type.unwrap_err() {
1292 CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1293 err => panic!("Unexpected error {err}"),
1294 }
1295
1296 let bad_idx: CustomResult<String> =
1297 db.query_row_and_then(query, [], |row| row.get(3).map_err(CustomError::Sqlite));
1298
1299 match bad_idx.unwrap_err() {
1300 CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1301 err => panic!("Unexpected error {err}"),
1302 }
1303
1304 let non_sqlite_err: CustomResult<String> =
1305 db.query_row_and_then(query, [], |_| Err(CustomError::SomeError));
1306
1307 match non_sqlite_err.unwrap_err() {
1308 CustomError::SomeError => (),
1309 err => panic!("Unexpected error {err}"),
1310 }
1311 Ok(())
1312 }
1313
1314 #[test]
1315 fn test_rows_and_then_with_custom_error() -> Result<()> {
1316 let db = checked_memory_handle();
1317 db.execute_batch("CREATE TABLE test (value INTEGER)")?;
1318 db.execute_batch("INSERT INTO test VALUES (1), (3), (5)")?;
1319
1320 let mut stmt = db.prepare("SELECT value FROM test ORDER BY value")?;
1321 let rows = stmt.query([])?;
1322
1323 let results: Vec<i32> = rows
1325 .and_then(|row| -> CustomResult<i32> {
1326 let val: i32 = row.get(0)?; if val > 10 {
1328 Err(CustomError::SomeError) } else {
1330 Ok(val)
1331 }
1332 })
1333 .collect::<CustomResult<Vec<_>>>()
1334 .unwrap();
1335
1336 assert_eq!(results, vec![1, 3, 5]);
1337 Ok(())
1338 }
1339 }
1340
1341 #[test]
1342 fn test_dynamic() -> Result<()> {
1343 let db = checked_memory_handle();
1344 let sql = "BEGIN;
1345 CREATE TABLE foo(x INTEGER, y TEXT);
1346 INSERT INTO foo VALUES(4, 'hello');
1347 END;";
1348 db.execute_batch(sql)?;
1349
1350 db.query_row("SELECT * FROM foo", [], |r| {
1351 assert_eq!(2, r.as_ref().column_count());
1352 Ok(())
1353 })
1354 }
1355 #[test]
1356 fn test_dyn_box() -> Result<()> {
1357 let db = checked_memory_handle();
1358 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1359 let b: Box<dyn ToSql> = Box::new(5);
1360 db.execute("INSERT INTO foo VALUES(?)", [b])?;
1361 db.query_row("SELECT x FROM foo", [], |r| {
1362 assert_eq!(5, r.get_unwrap::<_, i32>(0));
1363 Ok(())
1364 })
1365 }
1366
1367 #[test]
1368 fn test_alter_table() -> Result<()> {
1369 let db = checked_memory_handle();
1370 db.execute_batch("CREATE TABLE x(t INTEGER);")?;
1371 db.execute("ALTER TABLE x RENAME TO y;", [])?;
1373 Ok(())
1374 }
1375
1376 #[test]
1377 fn test_query_arrow_record_batch_small() -> Result<()> {
1378 let db = checked_memory_handle();
1379 let sql = "BEGIN TRANSACTION;
1380 CREATE TABLE test(t INTEGER);
1381 INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);
1382 END TRANSACTION;";
1383 db.execute_batch(sql)?;
1384 let mut stmt = db.prepare("select t from test order by t desc")?;
1385 let mut arr = stmt.query_arrow([])?;
1386
1387 let schema = arr.get_schema();
1388 assert_eq!(schema.fields().len(), 1);
1389 assert_eq!(schema.field(0).name(), "t");
1390 assert_eq!(schema.field(0).data_type(), &DataType::Int32);
1391
1392 let rb = arr.next().unwrap();
1393 let column = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1394 assert_eq!(column.len(), 5);
1395 assert_eq!(column.value(0), 5);
1396 assert_eq!(column.value(1), 4);
1397 assert_eq!(column.value(2), 3);
1398 assert_eq!(column.value(3), 2);
1399 assert_eq!(column.value(4), 1);
1400
1401 assert!(arr.next().is_none());
1402 Ok(())
1403 }
1404
1405 #[test]
1406 fn test_query_arrow_record_batch_large() -> Result<()> {
1407 let db = checked_memory_handle();
1408 db.execute_batch("BEGIN TRANSACTION")?;
1409 db.execute_batch("CREATE TABLE test(t INTEGER);")?;
1410 for _ in 0..600 {
1411 db.execute_batch("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);")?;
1412 }
1413 db.execute_batch("END TRANSACTION")?;
1414 let rbs: Vec<RecordBatch> = db.prepare("select t from test order by t")?.query_arrow([])?.collect();
1415 assert_eq!(rbs.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3000);
1418 assert_eq!(
1419 rbs.iter()
1420 .map(|rb| rb
1421 .column(0)
1422 .as_any()
1423 .downcast_ref::<Int32Array>()
1424 .unwrap()
1425 .iter()
1426 .map(|i| i.unwrap())
1427 .sum::<i32>())
1428 .sum::<i32>(),
1429 9000
1430 );
1431 Ok(())
1432 }
1433
1434 #[test]
1435 fn round_trip_interval() -> Result<()> {
1436 let db = checked_memory_handle();
1437 db.execute_batch("CREATE TABLE foo (t INTERVAL);")?;
1438
1439 let d = Value::Interval {
1440 months: 1,
1441 days: 2,
1442 nanos: 3,
1443 };
1444 db.execute("INSERT INTO foo VALUES (?)", [d])?;
1445
1446 let mut stmt = db.prepare("SELECT t FROM foo")?;
1447 let mut rows = stmt.query([])?;
1448 let row = rows.next()?.unwrap();
1449 let d: Value = row.get_unwrap(0);
1450 assert_eq!(d, d);
1451 Ok(())
1452 }
1453
1454 #[test]
1455 fn test_database_name_to_string() -> Result<()> {
1456 assert_eq!(DatabaseName::Main.to_string(), "main");
1457 assert_eq!(DatabaseName::Temp.to_string(), "temp");
1458 assert_eq!(DatabaseName::Attached("abc").to_string(), "abc");
1459 Ok(())
1460 }
1461
1462 #[test]
1463 fn test_interrupt() -> Result<()> {
1464 let db = checked_memory_handle();
1465 let db_interrupt = db.interrupt_handle();
1466
1467 let (tx, rx) = std::sync::mpsc::channel();
1468 std::thread::spawn(move || {
1469 let mut stmt = db
1470 .prepare("select count(*) from range(10000000) t1, range(1000000) t2")
1471 .unwrap();
1472 tx.send(stmt.execute([])).unwrap();
1473 });
1474
1475 std::thread::sleep(std::time::Duration::from_millis(100));
1476 db_interrupt.interrupt();
1477
1478 let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
1479 assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
1480 Ok(())
1481 }
1482
1483 #[test]
1484 fn test_interrupt_on_dropped_db() {
1485 let db = checked_memory_handle();
1486 let db_interrupt = db.interrupt_handle();
1487
1488 drop(db);
1489 db_interrupt.interrupt();
1490 }
1491
1492 #[cfg(feature = "bundled")]
1493 #[test]
1494 fn test_version() -> Result<()> {
1495 let db = checked_memory_handle();
1496 let expected: String = format!("v{}", env!("CARGO_PKG_VERSION"));
1497 let actual = db.version()?;
1498 assert_eq!(expected, actual);
1499 Ok(())
1500 }
1501
1502 #[test]
1503 fn test_arrow_string_view_setting() -> Result<()> {
1504 {
1506 let config = Config::default().with("produce_arrow_string_view", "true")?;
1507 let conn = Connection::open_in_memory_with_flags(config)?;
1508
1509 let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1510 let arrow = query.query_arrow([])?;
1511
1512 let batch = arrow.into_iter().next().expect("Expected at least one batch");
1513 assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8);
1514 }
1515
1516 {
1517 let config = Config::default()
1518 .with("produce_arrow_string_view", "true")?
1519 .with("arrow_output_version", "1.4")?;
1520 let conn = Connection::open_in_memory_with_flags(config)?;
1521
1522 let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1523 let arrow = query.query_arrow([])?;
1524
1525 let batch = arrow.into_iter().next().expect("Expected at least one batch");
1526 assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8View);
1527 }
1528
1529 Ok(())
1530 }
1531
1532 #[test]
1533 fn test_prepare_multi_statement() -> Result<()> {
1534 let db = checked_memory_handle();
1535
1536 {
1537 let mut stmt =
1538 db.prepare("CREATE TABLE test(x INTEGER); INSERT INTO test VALUES (42); SELECT x FROM test;")?;
1539 let result: i32 = stmt.query_row([], |row| row.get(0))?;
1540 assert_eq!(result, 42);
1541 }
1542
1543 {
1544 let mut stmt = db.prepare(
1545 "CREATE TEMP TABLE temp_data(id INTEGER, value TEXT);
1546 INSERT INTO temp_data VALUES (1, 'first'), (2, 'second');
1547 SELECT COUNT(*) FROM temp_data;",
1548 )?;
1549 let count: i32 = stmt.query_row([], |row| row.get(0))?;
1550 assert_eq!(count, 2);
1551 }
1552
1553 Ok(())
1554 }
1555
1556 #[test]
1557 fn test_pivot_query() -> Result<()> {
1558 let db = checked_memory_handle();
1559
1560 db.execute_batch(
1561 "CREATE TABLE cities(city VARCHAR, year INTEGER, population INTEGER);
1562 INSERT INTO cities VALUES
1563 ('Amsterdam', 2000, 1005),
1564 ('Amsterdam', 2010, 1065),
1565 ('Amsterdam', 2020, 1158),
1566 ('Berlin', 2000, 3382),
1567 ('Berlin', 2010, 3460),
1568 ('Berlin', 2020, 3576);",
1569 )?;
1570
1571 let mut stmt = db.prepare("PIVOT cities ON year USING sum(population);")?;
1573 let mut rows = stmt.query([])?;
1574
1575 let mut row_count = 0;
1576 while let Some(_row) = rows.next()? {
1577 row_count += 1;
1578 }
1579 assert_eq!(row_count, 2);
1580
1581 Ok(())
1582 }
1583
1584 #[test]
1585 fn test_multiple_memory_databases() -> Result<()> {
1586 {
1588 let mem1 = Connection::open_in_memory()?;
1589 let mem2 = Connection::open_in_memory()?;
1590
1591 mem1.execute_batch("CREATE TABLE test (id INTEGER)")?;
1592 mem1.execute("INSERT INTO test VALUES (1)", [])?;
1593
1594 mem2.execute_batch("CREATE TABLE test (id INTEGER)")?;
1595 mem2.execute("INSERT INTO test VALUES (2)", [])?;
1596
1597 let value1: i32 = mem1.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1598 assert_eq!(value1, 1);
1599
1600 let value2: i32 = mem2.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1601 assert_eq!(value2, 2);
1602 }
1603
1604 {
1606 let shared = Connection::open_in_memory()?;
1607
1608 shared.execute_batch("CREATE TABLE shared_table (id INTEGER)")?;
1609 shared.execute("INSERT INTO shared_table VALUES (123)", [])?;
1610
1611 let cloned = shared.try_clone()?;
1612
1613 let value: i32 = cloned.query_row("SELECT id FROM shared_table", [], |r| r.get(0))?;
1615 assert_eq!(value, 123);
1616
1617 cloned.execute("INSERT INTO shared_table VALUES (456)", [])?;
1618
1619 let count: i64 = shared.query_row("SELECT COUNT(*) FROM shared_table", [], |r| r.get(0))?;
1621 assert_eq!(count, 2);
1622 }
1623
1624 Ok(())
1625 }
1626
1627 #[test]
1628 fn test_appender_with_catalog() -> Result<()> {
1629 let db = checked_memory_handle();
1630
1631 let temp_dir = tempfile::tempdir().unwrap();
1633 let attached_path = temp_dir.path().join("attached.db");
1634 db.execute_batch(&format!("ATTACH '{}' AS attached_db", attached_path.display()))?;
1635
1636 db.execute_batch("CREATE TABLE attached_db.main.test_table (id INTEGER, name TEXT)")?;
1638
1639 {
1641 let mut app = db.appender_to_catalog_and_db("test_table", "attached_db", "main")?;
1642 app.append_row(params![1, "Alice"])?;
1643 app.append_row(params![2, "Bob"])?;
1644 app.append_row(params![3, "Charlie"])?;
1645 }
1646
1647 let count: i64 = db.query_row("SELECT COUNT(*) FROM attached_db.main.test_table", [], |r| r.get(0))?;
1649 assert_eq!(count, 3);
1650
1651 let name: String = db.query_row("SELECT name FROM attached_db.main.test_table WHERE id = ?", [2], |r| {
1652 r.get(0)
1653 })?;
1654 assert_eq!(name, "Bob");
1655
1656 Ok(())
1657 }
1658
1659 #[test]
1660 fn test_appender_with_catalog_multiple_schemas() -> Result<()> {
1661 let db = checked_memory_handle();
1662
1663 let temp_dir = tempfile::tempdir().unwrap();
1665 let attached_path = temp_dir.path().join("multi_schema.db");
1666 db.execute_batch(&format!("ATTACH '{}' AS my_catalog", attached_path.display()))?;
1667
1668 db.execute_batch("CREATE SCHEMA my_catalog.schema1")?;
1670 db.execute_batch("CREATE SCHEMA my_catalog.schema2")?;
1671 db.execute_batch("CREATE TABLE my_catalog.schema1.data (value INTEGER)")?;
1672 db.execute_batch("CREATE TABLE my_catalog.schema2.data (value INTEGER)")?;
1673
1674 {
1676 let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema1")?;
1677 app.append_rows([[10], [20], [30]])?;
1678 }
1679
1680 {
1682 let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema2")?;
1683 app.append_rows([[100], [200]])?;
1684 }
1685
1686 let sum1: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema1.data", [], |r| r.get(0))?;
1688 assert_eq!(sum1, 60);
1689
1690 let sum2: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema2.data", [], |r| r.get(0))?;
1692 assert_eq!(sum2, 300);
1693
1694 Ok(())
1695 }
1696
1697 #[test]
1698 fn test_appender_with_catalog_main_vs_attached() -> Result<()> {
1699 let db = checked_memory_handle();
1700
1701 db.execute_batch("CREATE TABLE test (id INTEGER)")?;
1703
1704 let temp_dir = tempfile::tempdir().unwrap();
1706 let attached_path = temp_dir.path().join("other.db");
1707 db.execute_batch(&format!("ATTACH '{}' AS other_db", attached_path.display()))?;
1708 db.execute_batch("CREATE TABLE other_db.main.test (id INTEGER)")?;
1709
1710 {
1712 let mut app = db.appender_to_catalog_and_db("test", "memory", "main")?;
1713 app.append_rows([[1], [2]])?;
1714 }
1715
1716 {
1718 let mut app = db.appender_to_catalog_and_db("test", "other_db", "main")?;
1719 app.append_rows([[100], [200]])?;
1720 }
1721
1722 let count_main: i64 = db.query_row("SELECT COUNT(*) FROM test", [], |r| r.get(0))?;
1724 assert_eq!(count_main, 2);
1725
1726 let count_attached: i64 = db.query_row("SELECT COUNT(*) FROM other_db.main.test", [], |r| r.get(0))?;
1728 assert_eq!(count_attached, 2);
1729
1730 Ok(())
1731 }
1732
1733 #[test]
1734 fn test_appender_with_catalog_error_invalid_catalog() -> Result<()> {
1735 let db = checked_memory_handle();
1736
1737 let result = db.appender_to_catalog_and_db("test", "nonexistent_catalog", "main");
1739 assert!(result.is_err());
1740
1741 Ok(())
1742 }
1743
1744 #[test]
1745 fn test_appender_with_catalog_error_invalid_schema() -> Result<()> {
1746 let db = checked_memory_handle();
1747
1748 let temp_dir = tempfile::tempdir().unwrap();
1750 let attached_path = temp_dir.path().join("test.db");
1751 db.execute_batch(&format!("ATTACH '{}' AS my_db", attached_path.display()))?;
1752
1753 db.execute_batch("CREATE TABLE my_db.main.test (id INTEGER)")?;
1754
1755 let result = db.appender_to_catalog_and_db("test", "my_db", "nonexistent_schema");
1757 assert!(result.is_err());
1758
1759 Ok(())
1760 }
1761
1762 #[test]
1763 fn test_appender_with_catalog_flush() -> Result<()> {
1764 let db = checked_memory_handle();
1765
1766 let temp_dir = tempfile::tempdir().unwrap();
1768 let attached_path = temp_dir.path().join("flush_test.db");
1769 db.execute_batch(&format!("ATTACH '{}' AS flush_db", attached_path.display()))?;
1770
1771 db.execute_batch("CREATE TABLE flush_db.main.test (id INTEGER)")?;
1772
1773 {
1775 let mut app = db.appender_to_catalog_and_db("test", "flush_db", "main")?;
1776 app.append_row([1])?;
1777 app.append_row([2])?;
1778 app.flush()?;
1779 app.append_row([3])?;
1780 app.flush()?;
1781 }
1782
1783 let count: i64 = db.query_row("SELECT COUNT(*) FROM flush_db.main.test", [], |r| r.get(0))?;
1785 assert_eq!(count, 3);
1786
1787 Ok(())
1788 }
1789}