csv_async/async_writers/
aser_tokio.rs

1use std::result;
2
3use tokio::io::{self, AsyncWrite, AsyncWriteExt};
4use serde::Serialize;
5
6use crate::AsyncWriterBuilder;
7use crate::error::{IntoInnerError, Result};
8use super::mwtr_serde::MemWriter;
9
10impl AsyncWriterBuilder {
11    /// Build a CSV `serde` serializer from this configuration that writes data to `ser`.
12    ///
13    /// Note that the CSV serializer is buffered automatically, so you should not
14    /// wrap `ser` in a buffered writer.
15    ///
16    /// # Example
17    ///
18    /// ```
19    /// use std::error::Error;
20    /// use csv_async::AsyncWriterBuilder;
21    /// use serde::Serialize;
22    ///
23    /// #[derive(Serialize)]
24    /// struct Row<'a> {
25    ///     name: &'a str,
26    ///     x: u64,
27    ///     y: u64,
28    /// }
29    ///
30    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
31    /// async fn example() -> Result<(), Box<dyn Error>> {
32    ///     let mut ser = AsyncWriterBuilder::new().has_headers(false).create_serializer(vec![]);
33    ///     ser.serialize(Row {name: "p1", x: 1, y: 2}).await?;
34    ///     ser.serialize(Row {name: "p2", x: 3, y: 4}).await?;
35    ///
36    ///     let data = String::from_utf8(ser.into_inner().await?)?;
37    ///     assert_eq!(data, "p1,1,2\np2,3,4\n");
38    ///     Ok(())
39    /// }
40    /// ```
41    pub fn create_serializer<W: AsyncWrite + Unpin>(&self, wtr: W) -> AsyncSerializer<W> {
42        AsyncSerializer::new(self, wtr)
43    }
44}
45
46/// An already configured CSV `serde` serializer for `tokio` runtime.
47///
48/// A CSV serializer takes as input Rust structures that implement `serde::Serialize` trait
49/// and writes those data in a valid CSV output.
50///
51/// While CSV writing is considerably easier than parsing CSV, a proper writer
52/// will do a number of things for you:
53///
54/// 1. Quote fields when necessary.
55/// 2. Check that all records have the same number of fields.
56/// 3. Write records with a single empty field correctly.
57/// 4. Automatically serialize normal Rust types to CSV records. When that
58///    type is a struct, a header row is automatically written corresponding
59///    to the fields of that struct.
60/// 5. Use buffering intelligently and otherwise avoid allocation. (This means
61///    that callers should not do their own buffering.)
62///
63/// All of the above can be configured using a
64/// [`AsyncWriterBuilder`](struct.AsyncWriterBuilder.html).
65/// However, a `AsyncSerializer` has convenient constructor (`from_writer`) 
66/// that use the default configuration.
67///
68/// Note that the default configuration of a `AsyncSerializer` uses `\n` for record
69/// terminators instead of `\r\n` as specified by RFC 4180. Use the
70/// `terminator` method on `AsyncWriterBuilder` to set the terminator to `\r\n` if
71/// it's desired.
72#[derive(Debug)]
73pub struct AsyncSerializer<W: AsyncWrite + Unpin> {
74    ser_wtr: MemWriter,
75    asy_wtr: Option<W>,
76}
77
78impl<W: AsyncWrite + Unpin> Drop for AsyncSerializer<W> {
79    fn drop(&mut self) {
80        // We ignore result of flush() call while dropping
81        // Well known problem.
82        // If you care about flush result call it explicitly 
83        // before AsyncSerializer goes out of scope,
84        // second flush() call should be no op.
85        let _ = futures::executor::block_on(self.flush());
86    }
87}
88
89impl<W: AsyncWrite + Unpin> AsyncSerializer<W> {
90    fn new(builder: &AsyncWriterBuilder, wtr: W) -> Self {
91        AsyncSerializer {
92            ser_wtr: MemWriter::new(builder),
93            asy_wtr: Some(wtr),
94        }
95    }
96
97    /// Build a CSV serializer with a default configuration that writes data to
98    /// `ser`.
99    ///
100    /// Note that the CSV serializer is buffered automatically, so you should not
101    /// wrap `ser` in a buffered writer.
102    ///
103    /// # Example
104    ///
105    /// ```
106    /// use std::error::Error;
107    /// use csv_async::AsyncSerializer;
108    /// use serde::Serialize;
109    ///
110    /// #[derive(Serialize)]
111    /// struct Row<'a> {
112    ///     name: &'a str,
113    ///     x: u64,
114    ///     y: u64,
115    /// }
116    ///
117    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
118    /// async fn example() -> Result<(), Box<dyn Error>> {
119    ///     let mut ser = AsyncSerializer::from_writer(vec![]);
120    ///     ser.serialize(Row {name: "p1", x: 1, y: 2}).await?;
121    ///     ser.serialize(Row {name: "p2", x: 3, y: 4}).await?;
122    ///
123    ///     let data = String::from_utf8(ser.into_inner().await?)?;
124    ///     assert_eq!(data, "name,x,y\np1,1,2\np2,3,4\n");
125    ///     Ok(())
126    /// }
127    /// ```
128    pub fn from_writer(wtr: W) -> AsyncSerializer<W> {
129        AsyncWriterBuilder::new().create_serializer(wtr)
130    }
131
132    /// Serialize a single record using Serde.
133    ///
134    /// # Example
135    ///
136    /// This shows how to serialize normal Rust structs as CSV records. The
137    /// fields of the struct are used to write a header row automatically.
138    /// (Writing the header row automatically can be disabled by building the
139    /// CSV writer with a [`WriterBuilder`](struct.WriterBuilder.html) and
140    /// calling the `has_headers` method.)
141    ///
142    /// ```
143    /// use std::error::Error;
144    /// use csv_async::AsyncSerializer;
145    /// use serde::Serialize;
146    ///
147    /// #[derive(Serialize)]
148    /// struct Row<'a> {
149    ///     city: &'a str,
150    ///     country: &'a str,
151    ///     // Serde allows us to name our headers exactly,
152    ///     // even if they don't match our struct field names.
153    ///     #[serde(rename = "popcount")]
154    ///     population: u64,
155    /// }
156    ///
157    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
158    /// async fn example() -> Result<(), Box<dyn Error>> {
159    ///     let mut ser = AsyncSerializer::from_writer(vec![]);
160    ///     ser.serialize(Row {
161    ///         city: "Boston",
162    ///         country: "United States",
163    ///         population: 4628910,
164    ///     }).await?;
165    ///     ser.serialize(Row {
166    ///         city: "Concord",
167    ///         country: "United States",
168    ///         population: 42695,
169    ///     }).await?;
170    ///
171    ///     let data = String::from_utf8(ser.into_inner().await?)?;
172    ///     assert_eq!(data, indoc::indoc! {"
173    ///         city,country,popcount
174    ///         Boston,United States,4628910
175    ///         Concord,United States,42695
176    ///     "});
177    ///     Ok(())
178    /// }
179    /// ```
180    ///
181    /// # Rules
182    ///
183    /// The behavior of `serialize` is fairly simple:
184    ///
185    /// 1. Nested containers (tuples, `Vec`s, structs, etc.) are always
186    ///    flattened (depth-first order).
187    ///
188    /// 2. If `has_headers` is `true` and the type contains field names, then
189    ///    a header row is automatically generated.
190    ///
191    /// However, some container types cannot be serialized, and if
192    /// `has_headers` is `true`, there are some additional restrictions on the
193    /// types that can be serialized. See below for details.
194    ///
195    /// For the purpose of this section, Rust types can be divided into three
196    /// categories: scalars, non-struct containers, and structs.
197    ///
198    /// ## Scalars
199    ///
200    /// Single values with no field names are written like the following. Note
201    /// that some of the outputs may be quoted, according to the selected
202    /// quoting style.
203    ///
204    /// | Name | Example Type | Example Value | Output |
205    /// | ---- | ---- | ---- | ---- |
206    /// | boolean | `bool` | `true` | `true` |
207    /// | integers | `i8`, `i16`, `i32`, `i64`, `i128`, `u8`, `u16`, `u32`, `u64`, `u128` | `5` | `5` |
208    /// | floats | `f32`, `f64` | `3.14` | `3.14` |
209    /// | character | `char` | `'☃'` | `☃` |
210    /// | string | `&str` | `"hi"` | `hi` |
211    /// | bytes | `&[u8]` | `b"hi"[..]` | `hi` |
212    /// | option | `Option` | `None` | *empty* |
213    /// | option |          | `Some(5)` | `5` |
214    /// | unit | `()` | `()` | *empty* |
215    /// | unit struct | `struct Foo;` | `Foo` | `Foo` |
216    /// | unit enum variant | `enum E { A, B }` | `E::A` | `A` |
217    /// | newtype struct | `struct Foo(u8);` | `Foo(5)` | `5` |
218    /// | newtype enum variant | `enum E { A(u8) }` | `E::A(5)` | `5` |
219    ///
220    /// Note that this table includes simple structs and enums. For example, to
221    /// serialize a field from either an integer or a float type, one can do
222    /// this:
223    ///
224    /// ```
225    /// use std::error::Error;
226    ///
227    /// use csv_async::AsyncSerializer;
228    /// use serde::Serialize;
229    ///
230    /// #[derive(Serialize)]
231    /// struct Row {
232    ///     label: String,
233    ///     value: Value,
234    /// }
235    ///
236    /// #[derive(Serialize)]
237    /// enum Value {
238    ///     Integer(i64),
239    ///     Float(f64),
240    /// }
241    ///
242    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
243    /// async fn example() -> Result<(), Box<dyn Error>> {
244    ///     let mut ser = AsyncSerializer::from_writer(vec![]);
245    ///     ser.serialize(Row {
246    ///         label: "foo".to_string(),
247    ///         value: Value::Integer(3),
248    ///     }).await?;
249    ///     ser.serialize(Row {
250    ///         label: "bar".to_string(),
251    ///         value: Value::Float(3.14),
252    ///     }).await?;
253    ///
254    ///     let data = String::from_utf8(ser.into_inner().await?)?;
255    ///     assert_eq!(data, indoc::indoc! {"
256    ///         label,value
257    ///         foo,3
258    ///         bar,3.14
259    ///     "});
260    ///     Ok(())
261    /// }
262    /// ```
263    ///
264    /// ## Non-Struct Containers
265    ///
266    /// Nested containers are flattened to their scalar components, with the
267    /// exception of a few types that are not allowed:
268    ///
269    /// | Name | Example Type | Example Value | Output |
270    /// | ---- | ---- | ---- | ---- |
271    /// | sequence | `Vec<u8>` | `vec![1, 2, 3]` | `1,2,3` |
272    /// | tuple | `(u8, bool)` | `(5, true)` | `5,true` |
273    /// | tuple struct | `Foo(u8, bool)` | `Foo(5, true)` | `5,true` |
274    /// | tuple enum variant | `enum E { A(u8, bool) }` | `E::A(5, true)` | *error* |
275    /// | struct enum variant | `enum E { V { a: u8, b: bool } }` | `E::V { a: 5, b: true }` | *error* |
276    /// | map | `BTreeMap<K, V>` | `BTreeMap::new()` | *error* |
277    ///
278    /// ## Structs
279    ///
280    /// Like the other containers, structs are flattened to their scalar
281    /// components:
282    ///
283    /// | Name | Example Type | Example Value | Output |
284    /// | ---- | ---- | ---- | ---- |
285    /// | struct | `struct Foo { a: u8, b: bool }` | `Foo { a: 5, b: true }` | `5,true` |
286    ///
287    /// If `has_headers` is `false`, then there are no additional restrictions;
288    /// types can be nested arbitrarily. For example:
289    ///
290    /// ```
291    /// use std::error::Error;
292    /// use csv_async::AsyncWriterBuilder;
293    /// use serde::Serialize;
294    ///
295    /// #[derive(Serialize)]
296    /// struct Row {
297    ///     label: String,
298    ///     values: Vec<f64>,
299    /// }
300    ///
301    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
302    /// async fn example() -> Result<(), Box<dyn Error>> {
303    ///     let mut ser = AsyncWriterBuilder::new()
304    ///         .has_headers(false)
305    ///         .create_serializer(vec![]);
306    ///     ser.serialize(Row {
307    ///         label: "foo".to_string(),
308    ///         values: vec![1.1234, 2.5678, 3.14],
309    ///     }).await?;
310    ///
311    ///     let data = String::from_utf8(ser.into_inner().await?)?;
312    ///     assert_eq!(data, indoc::indoc! {"
313    ///         foo,1.1234,2.5678,3.14
314    ///     "});
315    ///     Ok(())
316    /// }
317    /// ```
318    ///
319    /// However, if `has_headers` were enabled in the above example, then
320    /// serialization would return an error. Specifically, when `has_headers` is
321    /// `true`, there are two restrictions:
322    ///
323    /// 1. Named field values in structs must be scalars.
324    ///
325    /// 2. All scalars must be named field values in structs.
326    ///
327    /// Other than these two restrictions, types can be nested arbitrarily.
328    /// Here are a few examples:
329    ///
330    /// | Value | Header | Record |
331    /// | ---- | ---- | ---- |
332    /// | `(Foo { x: 5, y: 6 }, Bar { z: true })` | `x,y,z` | `5,6,true` |
333    /// | `vec![Foo { x: 5, y: 6 }, Foo { x: 7, y: 8 }]` | `x,y,x,y` | `5,6,7,8` |
334    /// | `(Foo { x: 5, y: 6 }, vec![Bar { z: Baz(true) }])` | `x,y,z` | `5,6,true` |
335    /// | `Foo { x: 5, y: (6, 7) }` | *error: restriction 1* | `5,6,7` |
336    /// | `(5, Foo { x: 6, y: 7 }` | *error: restriction 2* | `5,6,7` |
337    /// | `(Foo { x: 5, y: 6 }, true)` | *error: restriction 2* | `5,6,true` |
338    pub async fn serialize<S: Serialize>(&mut self, record: S) -> Result<()> {
339        self.ser_wtr.serialize(record)?;
340        self.ser_wtr.flush()?;
341        self.asy_wtr.as_mut().unwrap().write_all(self.ser_wtr.data()).await?;
342        self.ser_wtr.clear();
343        Ok(())
344    }
345
346    /// Flushes the underlying asynchronous writer.
347    pub async fn flush(&mut self) -> io::Result<()> {
348        if let Some(ref mut asy_wtr) = self.asy_wtr {
349            asy_wtr.flush().await?;
350        }
351        Ok(())
352    }
353
354    /// Flush the contents of the internal buffer and return the underlying
355    /// writer.
356    pub async fn into_inner(
357        mut self,
358    ) -> result::Result<W, IntoInnerError<AsyncSerializer<W>>> {
359        match self.flush().await {
360            Ok(()) => Ok(self.asy_wtr.take().unwrap()),
361            Err(err) => Err(IntoInnerError::new(self, err)),
362        }
363    }
364}