duckdb/appender/
mod.rs

1use super::{ffi, AppenderParams, Connection, Result, ValueRef};
2use std::{ffi::c_void, fmt, os::raw::c_char};
3
4use crate::{
5    error::result_from_duckdb_appender,
6    types::{ToSql, ToSqlOutput},
7    Error,
8};
9
10/// Appender for fast import data
11///
12/// # Thread Safety
13///
14/// `Appender` is neither `Send` nor `Sync`:
15/// - Not `Send` because it holds a reference to `Connection`, which is `!Sync`
16/// - Not `Sync` because DuckDB appenders don't support concurrent access
17///
18/// To use an appender in another thread, move the `Connection` to that thread
19/// and create the appender there.
20///
21/// If you need to share an `Appender` across threads, wrap it in a `Mutex`.
22///
23/// See [DuckDB concurrency documentation](https://duckdb.org/docs/stable/connect/concurrency.html) for more details.
24///
25/// # Wide Tables (Many Columns)
26///
27/// Array literals `[value; N]` are supported for tables with up to 32 columns.
28///
29/// ```rust,ignore
30/// appender.append_row([0; 32])?;
31/// appender.append_row([1, 2, 3, 4, 5])?;
32/// ```
33///
34/// For tables with more than 32 columns, use one of these alternatives:
35///
36/// ## 1. Slice approach - convert values to `&dyn ToSql`
37///
38/// ```rust,ignore
39/// let values: Vec<i32> = vec![0; 100];
40/// let params: Vec<&dyn ToSql> = values.iter().map(|v| v as &dyn ToSql).collect();
41/// appender.append_row(params.as_slice())?;
42/// ```
43///
44/// ## 2. `params!` macro - write values explicitly
45///
46/// ```rust,ignore
47/// appender.append_row(params![v1, v2, v3, ..., v50])?;
48/// ```
49///
50/// ## 3. `appender_params_from_iter` - pass an iterator directly
51///
52/// ```rust,ignore
53/// use duckdb::appender_params_from_iter;
54/// let values: Vec<i32> = vec![0; 100];
55/// appender.append_row(appender_params_from_iter(values))?;
56/// ```
57///
58/// All three methods can be used interchangeably and mixed in the same appender.
59pub struct Appender<'conn> {
60    conn: &'conn Connection,
61    app: ffi::duckdb_appender,
62}
63
64#[cfg(feature = "appender-arrow")]
65mod arrow;
66
67impl Appender<'_> {
68    /// Append multiple rows from Iterator
69    ///
70    /// ## Example
71    ///
72    /// ```rust,no_run
73    /// # use duckdb::{Connection, Result, params};
74    /// fn insert_rows(conn: &Connection) -> Result<()> {
75    ///     let mut app = conn.appender("foo")?;
76    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
77    ///     Ok(())
78    /// }
79    /// ```
80    ///
81    /// # Failure
82    ///
83    /// Will return `Err` if append column count not the same with the table schema
84    #[inline]
85    pub fn append_rows<P, I>(&mut self, rows: I) -> Result<()>
86    where
87        I: IntoIterator<Item = P>,
88        P: AppenderParams,
89    {
90        for row in rows {
91            self.append_row(row)?;
92        }
93        Ok(())
94    }
95
96    /// Append one row
97    ///
98    /// ## Example
99    ///
100    /// ```rust,no_run
101    /// # use duckdb::{Connection, Result, params};
102    /// fn insert_row(conn: &Connection) -> Result<()> {
103    ///     let mut app = conn.appender("foo")?;
104    ///     app.append_row([1, 2])?;
105    ///     Ok(())
106    /// }
107    /// ```
108    ///
109    /// # Failure
110    ///
111    /// Will return `Err` if append column count not the same with the table schema
112    #[inline]
113    pub fn append_row<P: AppenderParams>(&mut self, params: P) -> Result<()> {
114        let _ = unsafe { ffi::duckdb_appender_begin_row(self.app) };
115        params.__bind_in(self)?;
116        // NOTE: we only check end_row return value
117        let rc = unsafe { ffi::duckdb_appender_end_row(self.app) };
118        result_from_duckdb_appender(rc, &mut self.app)
119    }
120
121    #[inline]
122    pub(crate) fn bind_parameters<P>(&mut self, params: P) -> Result<()>
123    where
124        P: IntoIterator,
125        P::Item: ToSql,
126    {
127        for p in params.into_iter() {
128            self.bind_parameter(&p)?;
129        }
130        Ok(())
131    }
132
133    fn bind_parameter<P: ?Sized + ToSql>(&self, param: &P) -> Result<()> {
134        let value = param.to_sql()?;
135
136        let ptr = self.app;
137        let value = match value {
138            ToSqlOutput::Borrowed(v) => v,
139            ToSqlOutput::Owned(ref v) => ValueRef::from(v),
140        };
141        // NOTE: we ignore the return value here
142        //       because if anything failed, end_row will fail
143        // TODO: append more
144        let rc = match value {
145            ValueRef::Null => unsafe { ffi::duckdb_append_null(ptr) },
146            ValueRef::Boolean(i) => unsafe { ffi::duckdb_append_bool(ptr, i) },
147            ValueRef::TinyInt(i) => unsafe { ffi::duckdb_append_int8(ptr, i) },
148            ValueRef::SmallInt(i) => unsafe { ffi::duckdb_append_int16(ptr, i) },
149            ValueRef::Int(i) => unsafe { ffi::duckdb_append_int32(ptr, i) },
150            ValueRef::BigInt(i) => unsafe { ffi::duckdb_append_int64(ptr, i) },
151            ValueRef::UTinyInt(i) => unsafe { ffi::duckdb_append_uint8(ptr, i) },
152            ValueRef::USmallInt(i) => unsafe { ffi::duckdb_append_uint16(ptr, i) },
153            ValueRef::UInt(i) => unsafe { ffi::duckdb_append_uint32(ptr, i) },
154            ValueRef::UBigInt(i) => unsafe { ffi::duckdb_append_uint64(ptr, i) },
155            ValueRef::HugeInt(i) => unsafe {
156                let hi = ffi::duckdb_hugeint {
157                    lower: i as u64,
158                    upper: (i >> 64) as i64,
159                };
160                ffi::duckdb_append_hugeint(ptr, hi)
161            },
162
163            ValueRef::Float(r) => unsafe { ffi::duckdb_append_float(ptr, r) },
164            ValueRef::Double(r) => unsafe { ffi::duckdb_append_double(ptr, r) },
165            ValueRef::Text(s) => unsafe {
166                ffi::duckdb_append_varchar_length(ptr, s.as_ptr() as *const c_char, s.len() as u64)
167            },
168            ValueRef::Timestamp(u, i) => unsafe {
169                ffi::duckdb_append_timestamp(ptr, ffi::duckdb_timestamp { micros: u.to_micros(i) })
170            },
171            ValueRef::Blob(b) => unsafe { ffi::duckdb_append_blob(ptr, b.as_ptr() as *const c_void, b.len() as u64) },
172            ValueRef::Date32(d) => unsafe { ffi::duckdb_append_date(ptr, ffi::duckdb_date { days: d }) },
173            ValueRef::Time64(u, v) => unsafe {
174                ffi::duckdb_append_time(ptr, ffi::duckdb_time { micros: u.to_micros(v) })
175            },
176            ValueRef::Interval { months, days, nanos } => unsafe {
177                ffi::duckdb_append_interval(
178                    ptr,
179                    ffi::duckdb_interval {
180                        months,
181                        days,
182                        micros: nanos / 1000,
183                    },
184                )
185            },
186            _ => unreachable!("not supported"),
187        };
188        if rc != 0 {
189            return Err(Error::AppendError);
190        }
191        Ok(())
192    }
193
194    #[inline]
195    pub(super) fn new(conn: &Connection, app: ffi::duckdb_appender) -> Appender<'_> {
196        Appender { conn, app }
197    }
198
199    /// Flush data into DB
200    #[inline]
201    pub fn flush(&mut self) -> Result<()> {
202        unsafe {
203            let res = ffi::duckdb_appender_flush(self.app);
204            result_from_duckdb_appender(res, &mut self.app)
205        }
206    }
207
208    /// Add a column to the appender's active column list.
209    ///
210    /// When columns are added, only those columns need values during append.
211    /// Other columns will use their DEFAULT value (or NULL if no default).
212    ///
213    /// This flushes any pending data before modifying the column list.
214    #[inline]
215    pub fn add_column(&mut self, name: &str) -> Result<()> {
216        let c_name = std::ffi::CString::new(name)?;
217        let rc = unsafe { ffi::duckdb_appender_add_column(self.app, c_name.as_ptr() as *const c_char) };
218        result_from_duckdb_appender(rc, &mut self.app)
219    }
220
221    /// Clear the appender's active column list.
222    ///
223    /// After clearing, all columns become active again and values must be
224    /// provided for every column during append.
225    ///
226    /// This flushes any pending data before clearing.
227    #[inline]
228    pub fn clear_columns(&mut self) -> Result<()> {
229        let rc = unsafe { ffi::duckdb_appender_clear_columns(self.app) };
230        result_from_duckdb_appender(rc, &mut self.app)
231    }
232}
233
234impl Drop for Appender<'_> {
235    fn drop(&mut self) {
236        if !self.app.is_null() {
237            unsafe {
238                ffi::duckdb_appender_destroy(&mut self.app);
239            }
240        }
241    }
242}
243
244impl fmt::Debug for Appender<'_> {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        f.debug_struct("Appender").field("conn", self.conn).finish()
247    }
248}
249
250#[cfg(test)]
251mod test {
252    use crate::{params, Connection, Error, Result};
253
254    #[test]
255    fn test_append_one_row() -> Result<()> {
256        let db = Connection::open_in_memory()?;
257        db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
258
259        {
260            let mut app = db.appender("foo")?;
261            app.append_row([42])?;
262        }
263
264        let val = db.query_row("SELECT x FROM foo", [], |row| <(i32,)>::try_from(row))?;
265        assert_eq!(val, (42,));
266        Ok(())
267    }
268
269    #[test]
270    fn test_append_rows() -> Result<()> {
271        let db = Connection::open_in_memory()?;
272        db.execute_batch("CREATE TABLE foo(x INTEGER, y INTEGER)")?;
273
274        {
275            let mut app = db.appender("foo")?;
276            app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
277        }
278
279        let val = db.query_row("SELECT sum(x), sum(y) FROM foo", [], |row| <(i32, i32)>::try_from(row))?;
280        assert_eq!(val, (25, 30));
281        Ok(())
282    }
283
284    #[cfg(feature = "uuid")]
285    #[test]
286    fn test_append_uuid() -> Result<()> {
287        use uuid::Uuid;
288
289        let db = Connection::open_in_memory()?;
290        db.execute_batch("CREATE TABLE foo(x UUID)")?;
291
292        let id = Uuid::new_v4();
293        {
294            let mut app = db.appender("foo")?;
295            app.append_row([id])?;
296        }
297
298        let val = db.query_row("SELECT x FROM foo", [], |row| <(Uuid,)>::try_from(row))?;
299        assert_eq!(val, (id,));
300        Ok(())
301    }
302
303    #[test]
304    fn test_append_string_as_ts_row() -> Result<()> {
305        let db = Connection::open_in_memory()?;
306        db.execute_batch("CREATE TABLE foo(x TIMESTAMP)")?;
307
308        {
309            let mut app = db.appender("foo")?;
310            app.append_row(["2022-04-09 15:56:37.544"])?;
311        }
312
313        let val = db.query_row("SELECT x FROM foo", [], |row| <(i64,)>::try_from(row))?;
314        assert_eq!(val, (1649519797544000,));
315        Ok(())
316    }
317
318    #[test]
319    fn test_append_timestamp() -> Result<()> {
320        use std::time::Duration;
321        let db = Connection::open_in_memory()?;
322        db.execute_batch("CREATE TABLE foo(x TIMESTAMP)")?;
323
324        let d = Duration::from_secs(1);
325        {
326            let mut app = db.appender("foo")?;
327            app.append_row([d])?;
328        }
329
330        let val = db.query_row("SELECT x FROM foo where x=?", [d], |row| <(i32,)>::try_from(row))?;
331        assert_eq!(val, (d.as_micros() as i32,));
332        Ok(())
333    }
334
335    #[test]
336    #[cfg(feature = "chrono")]
337    fn test_append_datetime() -> Result<()> {
338        use chrono::{NaiveDate, NaiveDateTime};
339
340        let db = Connection::open_in_memory()?;
341        db.execute_batch("CREATE TABLE foo(x DATE, y TIMESTAMP)")?;
342
343        let date = NaiveDate::from_ymd_opt(2024, 6, 5).unwrap();
344        let timestamp = date.and_hms_opt(18, 26, 53).unwrap();
345        {
346            let mut app = db.appender("foo")?;
347            app.append_row(params![date, timestamp])?;
348        }
349        let (date2, timestamp2) = db.query_row("SELECT x, y FROM foo", [], |row| {
350            Ok((row.get::<_, NaiveDate>(0)?, row.get::<_, NaiveDateTime>(1)?))
351        })?;
352        assert_eq!(date, date2);
353        assert_eq!(timestamp, timestamp2);
354        Ok(())
355    }
356
357    #[test]
358    #[cfg(feature = "chrono")]
359    fn test_append_struct_with_params() -> Result<()> {
360        use chrono::NaiveDate;
361
362        struct Person {
363            first_name: String,
364            last_name: String,
365            dob: NaiveDate,
366        }
367
368        let db = Connection::open_in_memory()?;
369
370        db.execute_batch("CREATE TABLE foo(first_name VARCHAR, last_name VARCHAR, dob DATE);")?;
371
372        let person1 = Person {
373            first_name: String::from("John"),
374            last_name: String::from("Smith"),
375            dob: NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
376        };
377
378        let person2 = Person {
379            first_name: String::from("Jane"),
380            last_name: String::from("Smith"),
381            dob: NaiveDate::from_ymd_opt(1975, 1, 1).unwrap(),
382        };
383
384        // Use params! to extract struct fields
385        {
386            let persons = vec![&person1, &person2];
387            let mut app = db.appender("foo")?;
388            for p in &persons {
389                app.append_row(params![&p.first_name, &p.last_name, p.dob])?;
390            }
391        }
392
393        let count: i64 = db.query_row("SELECT count(*) FROM foo", [], |row| row.get(0))?;
394        assert_eq!(count, 2);
395
396        Ok(())
397    }
398
399    #[test]
400    fn test_appender_error() -> Result<()> {
401        let conn = Connection::open_in_memory()?;
402        conn.execute(
403            r"CREATE TABLE foo (
404            foobar TEXT,
405            foobar_int INT,
406            foobar_split TEXT[] AS (split(trim(foobar), ','))
407            );",
408            [],
409        )?;
410        let mut appender = conn.appender("foo")?;
411        match appender.append_row(params!["foo"]) {
412            Err(Error::DuckDBFailure(.., Some(msg))) => {
413                assert_eq!(msg, "Call to EndRow before all columns have been appended to!")
414            }
415            Err(err) => panic!("unexpected error: {err:?}"),
416            Ok(_) => panic!("expected an error but got Ok"),
417        }
418        Ok(())
419    }
420
421    #[test]
422    fn test_appender_foreign_key_constraint() -> Result<()> {
423        let conn = Connection::open_in_memory()?;
424        conn.execute_batch(
425            r"
426            CREATE TABLE parent (id INTEGER PRIMARY KEY);
427            CREATE TABLE child (
428                id INTEGER,
429                parent_id INTEGER,
430                FOREIGN KEY (parent_id) REFERENCES parent(id)
431            );",
432        )?;
433        conn.execute("INSERT INTO parent VALUES (1)", [])?;
434
435        let mut appender = conn.appender("child")?;
436        appender.append_row(params![1, 999])?; // Invalid parent_id
437
438        // Foreign key constraint should be checked during flush
439        match appender.flush() {
440            Err(Error::DuckDBFailure(_, Some(msg))) => {
441                assert_eq!(
442                    msg,
443                    "Failed to append: Violates foreign key constraint because key \"id: 999\" does not exist in the referenced table"
444                );
445            }
446            Err(e) => panic!("Expected foreign key constraint error, got: {e:?}"),
447            Ok(_) => panic!("Expected foreign key constraint error, but flush succeeded"),
448        }
449
450        Ok(())
451    }
452
453    #[test]
454    fn test_appender_defaults_and_column_switching() -> Result<()> {
455        let db = Connection::open_in_memory()?;
456        db.execute_batch("CREATE TABLE foo(a INT DEFAULT 99, b INT, c INT DEFAULT 7)")?;
457
458        // Only provide column b; a and c should use their defaults
459        {
460            let mut app = db.appender_with_columns("foo", &["b"])?;
461            app.append_row([Some(1)])?;
462            app.append_row([Option::<i32>::None])?;
463        }
464
465        // Switch to a different active column set, then back to full width
466        {
467            let mut app = db.appender("foo")?;
468            app.add_column("c")?;
469            app.add_column("a")?;
470            app.append_row([10, 1])?; // set c and a; b gets NULL
471
472            app.clear_columns()?; // revert to all columns
473            app.append_row([2, 3, 4])?;
474        }
475
476        let rows: Vec<(i32, Option<i32>, i32)> = db
477            .prepare("SELECT a, b, c FROM foo ORDER BY a, b NULLS LAST")?
478            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
479            .collect::<Result<_>>()?;
480
481        assert_eq!(
482            rows,
483            vec![
484                (1, None, 10),    // add_column path; b NULL, c set
485                (2, Some(3), 4),  // clear_columns path; all provided
486                (99, Some(1), 7), // defaults applied for a and c
487                (99, None, 7)     // default + NULL
488            ]
489        );
490
491        Ok(())
492    }
493
494    #[test]
495    fn test_appender_with_columns_sequence_default() -> Result<()> {
496        let db = Connection::open_in_memory()?;
497        db.execute_batch(
498            "CREATE SEQUENCE seq START 1;
499             CREATE TABLE foo(id INTEGER DEFAULT nextval('seq'), name TEXT)",
500        )?;
501
502        {
503            let mut app = db.appender_with_columns("foo", &["name"])?;
504            app.append_row(["Alice"])?;
505            app.append_row(["Bob"])?;
506            app.append_row(["Charlie"])?;
507        }
508
509        let rows: Vec<(i32, String)> = db
510            .prepare("SELECT id, name FROM foo ORDER BY id")?
511            .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
512            .collect::<Result<_>>()?;
513
514        assert_eq!(
515            rows,
516            vec![(1, "Alice".into()), (2, "Bob".into()), (3, "Charlie".into())]
517        );
518
519        Ok(())
520    }
521
522    #[test]
523    fn test_appender_with_columns_to_db_schema() -> Result<()> {
524        let db = Connection::open_in_memory()?;
525        db.execute_batch(
526            "CREATE SCHEMA s;
527             CREATE TABLE s.foo(a INTEGER DEFAULT 5, b INTEGER)",
528        )?;
529
530        {
531            let mut app = db.appender_with_columns_to_db("foo", "s", &["b"])?;
532            app.append_row([7])?;
533        }
534
535        let (a, b): (i32, i32) = db.query_row("SELECT a, b FROM s.foo", [], |row| Ok((row.get(0)?, row.get(1)?)))?;
536        assert_eq!((a, b), (5, 7));
537        Ok(())
538    }
539
540    #[test]
541    fn test_appender_with_columns_to_catalog_and_db() -> Result<()> {
542        let db = Connection::open_in_memory()?;
543        db.execute_batch(
544            "CREATE SCHEMA s;
545             CREATE TABLE s.bar(a INTEGER DEFAULT 11, b INTEGER)",
546        )?;
547
548        {
549            // Default in-memory catalog is "memory"
550            let mut app = db.appender_with_columns_to_catalog_and_db("bar", "memory", "s", &["b"])?;
551            app.append_row([9])?;
552        }
553
554        let (a, b): (i32, i32) = db.query_row("SELECT a, b FROM s.bar", [], |row| Ok((row.get(0)?, row.get(1)?)))?;
555        assert_eq!((a, b), (11, 9));
556        Ok(())
557    }
558}