1use super::{ffi, AppenderParams, Connection, Result, ValueRef};
2use std::{ffi::c_void, fmt, os::raw::c_char};
3
4use crate::{
5 error::result_from_duckdb_appender,
6 types::{ToSql, ToSqlOutput},
7 Error,
8};
9
10pub struct Appender<'conn> {
60 conn: &'conn Connection,
61 app: ffi::duckdb_appender,
62}
63
64#[cfg(feature = "appender-arrow")]
65mod arrow;
66
67impl Appender<'_> {
68 #[inline]
85 pub fn append_rows<P, I>(&mut self, rows: I) -> Result<()>
86 where
87 I: IntoIterator<Item = P>,
88 P: AppenderParams,
89 {
90 for row in rows {
91 self.append_row(row)?;
92 }
93 Ok(())
94 }
95
96 #[inline]
113 pub fn append_row<P: AppenderParams>(&mut self, params: P) -> Result<()> {
114 let _ = unsafe { ffi::duckdb_appender_begin_row(self.app) };
115 params.__bind_in(self)?;
116 let rc = unsafe { ffi::duckdb_appender_end_row(self.app) };
118 result_from_duckdb_appender(rc, &mut self.app)
119 }
120
121 #[inline]
122 pub(crate) fn bind_parameters<P>(&mut self, params: P) -> Result<()>
123 where
124 P: IntoIterator,
125 P::Item: ToSql,
126 {
127 for p in params.into_iter() {
128 self.bind_parameter(&p)?;
129 }
130 Ok(())
131 }
132
133 fn bind_parameter<P: ?Sized + ToSql>(&self, param: &P) -> Result<()> {
134 let value = param.to_sql()?;
135
136 let ptr = self.app;
137 let value = match value {
138 ToSqlOutput::Borrowed(v) => v,
139 ToSqlOutput::Owned(ref v) => ValueRef::from(v),
140 };
141 let rc = match value {
145 ValueRef::Null => unsafe { ffi::duckdb_append_null(ptr) },
146 ValueRef::Boolean(i) => unsafe { ffi::duckdb_append_bool(ptr, i) },
147 ValueRef::TinyInt(i) => unsafe { ffi::duckdb_append_int8(ptr, i) },
148 ValueRef::SmallInt(i) => unsafe { ffi::duckdb_append_int16(ptr, i) },
149 ValueRef::Int(i) => unsafe { ffi::duckdb_append_int32(ptr, i) },
150 ValueRef::BigInt(i) => unsafe { ffi::duckdb_append_int64(ptr, i) },
151 ValueRef::UTinyInt(i) => unsafe { ffi::duckdb_append_uint8(ptr, i) },
152 ValueRef::USmallInt(i) => unsafe { ffi::duckdb_append_uint16(ptr, i) },
153 ValueRef::UInt(i) => unsafe { ffi::duckdb_append_uint32(ptr, i) },
154 ValueRef::UBigInt(i) => unsafe { ffi::duckdb_append_uint64(ptr, i) },
155 ValueRef::HugeInt(i) => unsafe {
156 let hi = ffi::duckdb_hugeint {
157 lower: i as u64,
158 upper: (i >> 64) as i64,
159 };
160 ffi::duckdb_append_hugeint(ptr, hi)
161 },
162
163 ValueRef::Float(r) => unsafe { ffi::duckdb_append_float(ptr, r) },
164 ValueRef::Double(r) => unsafe { ffi::duckdb_append_double(ptr, r) },
165 ValueRef::Text(s) => unsafe {
166 ffi::duckdb_append_varchar_length(ptr, s.as_ptr() as *const c_char, s.len() as u64)
167 },
168 ValueRef::Timestamp(u, i) => unsafe {
169 ffi::duckdb_append_timestamp(ptr, ffi::duckdb_timestamp { micros: u.to_micros(i) })
170 },
171 ValueRef::Blob(b) => unsafe { ffi::duckdb_append_blob(ptr, b.as_ptr() as *const c_void, b.len() as u64) },
172 ValueRef::Date32(d) => unsafe { ffi::duckdb_append_date(ptr, ffi::duckdb_date { days: d }) },
173 ValueRef::Time64(u, v) => unsafe {
174 ffi::duckdb_append_time(ptr, ffi::duckdb_time { micros: u.to_micros(v) })
175 },
176 ValueRef::Interval { months, days, nanos } => unsafe {
177 ffi::duckdb_append_interval(
178 ptr,
179 ffi::duckdb_interval {
180 months,
181 days,
182 micros: nanos / 1000,
183 },
184 )
185 },
186 _ => unreachable!("not supported"),
187 };
188 if rc != 0 {
189 return Err(Error::AppendError);
190 }
191 Ok(())
192 }
193
194 #[inline]
195 pub(super) fn new(conn: &Connection, app: ffi::duckdb_appender) -> Appender<'_> {
196 Appender { conn, app }
197 }
198
199 #[inline]
201 pub fn flush(&mut self) -> Result<()> {
202 unsafe {
203 let res = ffi::duckdb_appender_flush(self.app);
204 result_from_duckdb_appender(res, &mut self.app)
205 }
206 }
207
208 #[inline]
215 pub fn add_column(&mut self, name: &str) -> Result<()> {
216 let c_name = std::ffi::CString::new(name)?;
217 let rc = unsafe { ffi::duckdb_appender_add_column(self.app, c_name.as_ptr() as *const c_char) };
218 result_from_duckdb_appender(rc, &mut self.app)
219 }
220
221 #[inline]
228 pub fn clear_columns(&mut self) -> Result<()> {
229 let rc = unsafe { ffi::duckdb_appender_clear_columns(self.app) };
230 result_from_duckdb_appender(rc, &mut self.app)
231 }
232}
233
234impl Drop for Appender<'_> {
235 fn drop(&mut self) {
236 if !self.app.is_null() {
237 unsafe {
238 ffi::duckdb_appender_destroy(&mut self.app);
239 }
240 }
241 }
242}
243
244impl fmt::Debug for Appender<'_> {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 f.debug_struct("Appender").field("conn", self.conn).finish()
247 }
248}
249
250#[cfg(test)]
251mod test {
252 use crate::{params, Connection, Error, Result};
253
254 #[test]
255 fn test_append_one_row() -> Result<()> {
256 let db = Connection::open_in_memory()?;
257 db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
258
259 {
260 let mut app = db.appender("foo")?;
261 app.append_row([42])?;
262 }
263
264 let val = db.query_row("SELECT x FROM foo", [], |row| <(i32,)>::try_from(row))?;
265 assert_eq!(val, (42,));
266 Ok(())
267 }
268
269 #[test]
270 fn test_append_rows() -> Result<()> {
271 let db = Connection::open_in_memory()?;
272 db.execute_batch("CREATE TABLE foo(x INTEGER, y INTEGER)")?;
273
274 {
275 let mut app = db.appender("foo")?;
276 app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
277 }
278
279 let val = db.query_row("SELECT sum(x), sum(y) FROM foo", [], |row| <(i32, i32)>::try_from(row))?;
280 assert_eq!(val, (25, 30));
281 Ok(())
282 }
283
284 #[cfg(feature = "uuid")]
285 #[test]
286 fn test_append_uuid() -> Result<()> {
287 use uuid::Uuid;
288
289 let db = Connection::open_in_memory()?;
290 db.execute_batch("CREATE TABLE foo(x UUID)")?;
291
292 let id = Uuid::new_v4();
293 {
294 let mut app = db.appender("foo")?;
295 app.append_row([id])?;
296 }
297
298 let val = db.query_row("SELECT x FROM foo", [], |row| <(Uuid,)>::try_from(row))?;
299 assert_eq!(val, (id,));
300 Ok(())
301 }
302
303 #[test]
304 fn test_append_string_as_ts_row() -> Result<()> {
305 let db = Connection::open_in_memory()?;
306 db.execute_batch("CREATE TABLE foo(x TIMESTAMP)")?;
307
308 {
309 let mut app = db.appender("foo")?;
310 app.append_row(["2022-04-09 15:56:37.544"])?;
311 }
312
313 let val = db.query_row("SELECT x FROM foo", [], |row| <(i64,)>::try_from(row))?;
314 assert_eq!(val, (1649519797544000,));
315 Ok(())
316 }
317
318 #[test]
319 fn test_append_timestamp() -> Result<()> {
320 use std::time::Duration;
321 let db = Connection::open_in_memory()?;
322 db.execute_batch("CREATE TABLE foo(x TIMESTAMP)")?;
323
324 let d = Duration::from_secs(1);
325 {
326 let mut app = db.appender("foo")?;
327 app.append_row([d])?;
328 }
329
330 let val = db.query_row("SELECT x FROM foo where x=?", [d], |row| <(i32,)>::try_from(row))?;
331 assert_eq!(val, (d.as_micros() as i32,));
332 Ok(())
333 }
334
335 #[test]
336 #[cfg(feature = "chrono")]
337 fn test_append_datetime() -> Result<()> {
338 use chrono::{NaiveDate, NaiveDateTime};
339
340 let db = Connection::open_in_memory()?;
341 db.execute_batch("CREATE TABLE foo(x DATE, y TIMESTAMP)")?;
342
343 let date = NaiveDate::from_ymd_opt(2024, 6, 5).unwrap();
344 let timestamp = date.and_hms_opt(18, 26, 53).unwrap();
345 {
346 let mut app = db.appender("foo")?;
347 app.append_row(params![date, timestamp])?;
348 }
349 let (date2, timestamp2) = db.query_row("SELECT x, y FROM foo", [], |row| {
350 Ok((row.get::<_, NaiveDate>(0)?, row.get::<_, NaiveDateTime>(1)?))
351 })?;
352 assert_eq!(date, date2);
353 assert_eq!(timestamp, timestamp2);
354 Ok(())
355 }
356
357 #[test]
358 #[cfg(feature = "chrono")]
359 fn test_append_struct_with_params() -> Result<()> {
360 use chrono::NaiveDate;
361
362 struct Person {
363 first_name: String,
364 last_name: String,
365 dob: NaiveDate,
366 }
367
368 let db = Connection::open_in_memory()?;
369
370 db.execute_batch("CREATE TABLE foo(first_name VARCHAR, last_name VARCHAR, dob DATE);")?;
371
372 let person1 = Person {
373 first_name: String::from("John"),
374 last_name: String::from("Smith"),
375 dob: NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
376 };
377
378 let person2 = Person {
379 first_name: String::from("Jane"),
380 last_name: String::from("Smith"),
381 dob: NaiveDate::from_ymd_opt(1975, 1, 1).unwrap(),
382 };
383
384 {
386 let persons = vec![&person1, &person2];
387 let mut app = db.appender("foo")?;
388 for p in &persons {
389 app.append_row(params![&p.first_name, &p.last_name, p.dob])?;
390 }
391 }
392
393 let count: i64 = db.query_row("SELECT count(*) FROM foo", [], |row| row.get(0))?;
394 assert_eq!(count, 2);
395
396 Ok(())
397 }
398
399 #[test]
400 fn test_appender_error() -> Result<()> {
401 let conn = Connection::open_in_memory()?;
402 conn.execute(
403 r"CREATE TABLE foo (
404 foobar TEXT,
405 foobar_int INT,
406 foobar_split TEXT[] AS (split(trim(foobar), ','))
407 );",
408 [],
409 )?;
410 let mut appender = conn.appender("foo")?;
411 match appender.append_row(params!["foo"]) {
412 Err(Error::DuckDBFailure(.., Some(msg))) => {
413 assert_eq!(msg, "Call to EndRow before all columns have been appended to!")
414 }
415 Err(err) => panic!("unexpected error: {err:?}"),
416 Ok(_) => panic!("expected an error but got Ok"),
417 }
418 Ok(())
419 }
420
421 #[test]
422 fn test_appender_foreign_key_constraint() -> Result<()> {
423 let conn = Connection::open_in_memory()?;
424 conn.execute_batch(
425 r"
426 CREATE TABLE parent (id INTEGER PRIMARY KEY);
427 CREATE TABLE child (
428 id INTEGER,
429 parent_id INTEGER,
430 FOREIGN KEY (parent_id) REFERENCES parent(id)
431 );",
432 )?;
433 conn.execute("INSERT INTO parent VALUES (1)", [])?;
434
435 let mut appender = conn.appender("child")?;
436 appender.append_row(params![1, 999])?; match appender.flush() {
440 Err(Error::DuckDBFailure(_, Some(msg))) => {
441 assert_eq!(
442 msg,
443 "Failed to append: Violates foreign key constraint because key \"id: 999\" does not exist in the referenced table"
444 );
445 }
446 Err(e) => panic!("Expected foreign key constraint error, got: {e:?}"),
447 Ok(_) => panic!("Expected foreign key constraint error, but flush succeeded"),
448 }
449
450 Ok(())
451 }
452
453 #[test]
454 fn test_appender_defaults_and_column_switching() -> Result<()> {
455 let db = Connection::open_in_memory()?;
456 db.execute_batch("CREATE TABLE foo(a INT DEFAULT 99, b INT, c INT DEFAULT 7)")?;
457
458 {
460 let mut app = db.appender_with_columns("foo", &["b"])?;
461 app.append_row([Some(1)])?;
462 app.append_row([Option::<i32>::None])?;
463 }
464
465 {
467 let mut app = db.appender("foo")?;
468 app.add_column("c")?;
469 app.add_column("a")?;
470 app.append_row([10, 1])?; app.clear_columns()?; app.append_row([2, 3, 4])?;
474 }
475
476 let rows: Vec<(i32, Option<i32>, i32)> = db
477 .prepare("SELECT a, b, c FROM foo ORDER BY a, b NULLS LAST")?
478 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
479 .collect::<Result<_>>()?;
480
481 assert_eq!(
482 rows,
483 vec![
484 (1, None, 10), (2, Some(3), 4), (99, Some(1), 7), (99, None, 7) ]
489 );
490
491 Ok(())
492 }
493
494 #[test]
495 fn test_appender_with_columns_sequence_default() -> Result<()> {
496 let db = Connection::open_in_memory()?;
497 db.execute_batch(
498 "CREATE SEQUENCE seq START 1;
499 CREATE TABLE foo(id INTEGER DEFAULT nextval('seq'), name TEXT)",
500 )?;
501
502 {
503 let mut app = db.appender_with_columns("foo", &["name"])?;
504 app.append_row(["Alice"])?;
505 app.append_row(["Bob"])?;
506 app.append_row(["Charlie"])?;
507 }
508
509 let rows: Vec<(i32, String)> = db
510 .prepare("SELECT id, name FROM foo ORDER BY id")?
511 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
512 .collect::<Result<_>>()?;
513
514 assert_eq!(
515 rows,
516 vec![(1, "Alice".into()), (2, "Bob".into()), (3, "Charlie".into())]
517 );
518
519 Ok(())
520 }
521
522 #[test]
523 fn test_appender_with_columns_to_db_schema() -> Result<()> {
524 let db = Connection::open_in_memory()?;
525 db.execute_batch(
526 "CREATE SCHEMA s;
527 CREATE TABLE s.foo(a INTEGER DEFAULT 5, b INTEGER)",
528 )?;
529
530 {
531 let mut app = db.appender_with_columns_to_db("foo", "s", &["b"])?;
532 app.append_row([7])?;
533 }
534
535 let (a, b): (i32, i32) = db.query_row("SELECT a, b FROM s.foo", [], |row| Ok((row.get(0)?, row.get(1)?)))?;
536 assert_eq!((a, b), (5, 7));
537 Ok(())
538 }
539
540 #[test]
541 fn test_appender_with_columns_to_catalog_and_db() -> Result<()> {
542 let db = Connection::open_in_memory()?;
543 db.execute_batch(
544 "CREATE SCHEMA s;
545 CREATE TABLE s.bar(a INTEGER DEFAULT 11, b INTEGER)",
546 )?;
547
548 {
549 let mut app = db.appender_with_columns_to_catalog_and_db("bar", "memory", "s", &["b"])?;
551 app.append_row([9])?;
552 }
553
554 let (a, b): (i32, i32) = db.query_row("SELECT a, b FROM s.bar", [], |row| Ok((row.get(0)?, row.get(1)?)))?;
555 assert_eq!((a, b), (11, 9));
556 Ok(())
557 }
558}