csv_async/async_writers/aser_tokio.rs
1use std::result;
2
3use tokio::io::{self, AsyncWrite, AsyncWriteExt};
4use serde::Serialize;
5
6use crate::AsyncWriterBuilder;
7use crate::error::{IntoInnerError, Result};
8use super::mwtr_serde::MemWriter;
9
10impl AsyncWriterBuilder {
11 /// Build a CSV `serde` serializer from this configuration that writes data to `ser`.
12 ///
13 /// Note that the CSV serializer is buffered automatically, so you should not
14 /// wrap `ser` in a buffered writer.
15 ///
16 /// # Example
17 ///
18 /// ```
19 /// use std::error::Error;
20 /// use csv_async::AsyncWriterBuilder;
21 /// use serde::Serialize;
22 ///
23 /// #[derive(Serialize)]
24 /// struct Row<'a> {
25 /// name: &'a str,
26 /// x: u64,
27 /// y: u64,
28 /// }
29 ///
30 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
31 /// async fn example() -> Result<(), Box<dyn Error>> {
32 /// let mut ser = AsyncWriterBuilder::new().has_headers(false).create_serializer(vec![]);
33 /// ser.serialize(Row {name: "p1", x: 1, y: 2}).await?;
34 /// ser.serialize(Row {name: "p2", x: 3, y: 4}).await?;
35 ///
36 /// let data = String::from_utf8(ser.into_inner().await?)?;
37 /// assert_eq!(data, "p1,1,2\np2,3,4\n");
38 /// Ok(())
39 /// }
40 /// ```
41 pub fn create_serializer<W: AsyncWrite + Unpin>(&self, wtr: W) -> AsyncSerializer<W> {
42 AsyncSerializer::new(self, wtr)
43 }
44}
45
46/// An already configured CSV `serde` serializer for `tokio` runtime.
47///
48/// A CSV serializer takes as input Rust structures that implement `serde::Serialize` trait
49/// and writes those data in a valid CSV output.
50///
51/// While CSV writing is considerably easier than parsing CSV, a proper writer
52/// will do a number of things for you:
53///
54/// 1. Quote fields when necessary.
55/// 2. Check that all records have the same number of fields.
56/// 3. Write records with a single empty field correctly.
57/// 4. Automatically serialize normal Rust types to CSV records. When that
58/// type is a struct, a header row is automatically written corresponding
59/// to the fields of that struct.
60/// 5. Use buffering intelligently and otherwise avoid allocation. (This means
61/// that callers should not do their own buffering.)
62///
63/// All of the above can be configured using a
64/// [`AsyncWriterBuilder`](struct.AsyncWriterBuilder.html).
65/// However, a `AsyncSerializer` has convenient constructor (`from_writer`)
66/// that use the default configuration.
67///
68/// Note that the default configuration of a `AsyncSerializer` uses `\n` for record
69/// terminators instead of `\r\n` as specified by RFC 4180. Use the
70/// `terminator` method on `AsyncWriterBuilder` to set the terminator to `\r\n` if
71/// it's desired.
72#[derive(Debug)]
73pub struct AsyncSerializer<W: AsyncWrite + Unpin> {
74 ser_wtr: MemWriter,
75 asy_wtr: Option<W>,
76}
77
78impl<W: AsyncWrite + Unpin> Drop for AsyncSerializer<W> {
79 fn drop(&mut self) {
80 // We ignore result of flush() call while dropping
81 // Well known problem.
82 // If you care about flush result call it explicitly
83 // before AsyncSerializer goes out of scope,
84 // second flush() call should be no op.
85 let _ = futures::executor::block_on(self.flush());
86 }
87}
88
89impl<W: AsyncWrite + Unpin> AsyncSerializer<W> {
90 fn new(builder: &AsyncWriterBuilder, wtr: W) -> Self {
91 AsyncSerializer {
92 ser_wtr: MemWriter::new(builder),
93 asy_wtr: Some(wtr),
94 }
95 }
96
97 /// Build a CSV serializer with a default configuration that writes data to
98 /// `ser`.
99 ///
100 /// Note that the CSV serializer is buffered automatically, so you should not
101 /// wrap `ser` in a buffered writer.
102 ///
103 /// # Example
104 ///
105 /// ```
106 /// use std::error::Error;
107 /// use csv_async::AsyncSerializer;
108 /// use serde::Serialize;
109 ///
110 /// #[derive(Serialize)]
111 /// struct Row<'a> {
112 /// name: &'a str,
113 /// x: u64,
114 /// y: u64,
115 /// }
116 ///
117 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
118 /// async fn example() -> Result<(), Box<dyn Error>> {
119 /// let mut ser = AsyncSerializer::from_writer(vec![]);
120 /// ser.serialize(Row {name: "p1", x: 1, y: 2}).await?;
121 /// ser.serialize(Row {name: "p2", x: 3, y: 4}).await?;
122 ///
123 /// let data = String::from_utf8(ser.into_inner().await?)?;
124 /// assert_eq!(data, "name,x,y\np1,1,2\np2,3,4\n");
125 /// Ok(())
126 /// }
127 /// ```
128 pub fn from_writer(wtr: W) -> AsyncSerializer<W> {
129 AsyncWriterBuilder::new().create_serializer(wtr)
130 }
131
132 /// Serialize a single record using Serde.
133 ///
134 /// # Example
135 ///
136 /// This shows how to serialize normal Rust structs as CSV records. The
137 /// fields of the struct are used to write a header row automatically.
138 /// (Writing the header row automatically can be disabled by building the
139 /// CSV writer with a [`WriterBuilder`](struct.WriterBuilder.html) and
140 /// calling the `has_headers` method.)
141 ///
142 /// ```
143 /// use std::error::Error;
144 /// use csv_async::AsyncSerializer;
145 /// use serde::Serialize;
146 ///
147 /// #[derive(Serialize)]
148 /// struct Row<'a> {
149 /// city: &'a str,
150 /// country: &'a str,
151 /// // Serde allows us to name our headers exactly,
152 /// // even if they don't match our struct field names.
153 /// #[serde(rename = "popcount")]
154 /// population: u64,
155 /// }
156 ///
157 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
158 /// async fn example() -> Result<(), Box<dyn Error>> {
159 /// let mut ser = AsyncSerializer::from_writer(vec![]);
160 /// ser.serialize(Row {
161 /// city: "Boston",
162 /// country: "United States",
163 /// population: 4628910,
164 /// }).await?;
165 /// ser.serialize(Row {
166 /// city: "Concord",
167 /// country: "United States",
168 /// population: 42695,
169 /// }).await?;
170 ///
171 /// let data = String::from_utf8(ser.into_inner().await?)?;
172 /// assert_eq!(data, indoc::indoc! {"
173 /// city,country,popcount
174 /// Boston,United States,4628910
175 /// Concord,United States,42695
176 /// "});
177 /// Ok(())
178 /// }
179 /// ```
180 ///
181 /// # Rules
182 ///
183 /// The behavior of `serialize` is fairly simple:
184 ///
185 /// 1. Nested containers (tuples, `Vec`s, structs, etc.) are always
186 /// flattened (depth-first order).
187 ///
188 /// 2. If `has_headers` is `true` and the type contains field names, then
189 /// a header row is automatically generated.
190 ///
191 /// However, some container types cannot be serialized, and if
192 /// `has_headers` is `true`, there are some additional restrictions on the
193 /// types that can be serialized. See below for details.
194 ///
195 /// For the purpose of this section, Rust types can be divided into three
196 /// categories: scalars, non-struct containers, and structs.
197 ///
198 /// ## Scalars
199 ///
200 /// Single values with no field names are written like the following. Note
201 /// that some of the outputs may be quoted, according to the selected
202 /// quoting style.
203 ///
204 /// | Name | Example Type | Example Value | Output |
205 /// | ---- | ---- | ---- | ---- |
206 /// | boolean | `bool` | `true` | `true` |
207 /// | integers | `i8`, `i16`, `i32`, `i64`, `i128`, `u8`, `u16`, `u32`, `u64`, `u128` | `5` | `5` |
208 /// | floats | `f32`, `f64` | `3.14` | `3.14` |
209 /// | character | `char` | `'☃'` | `☃` |
210 /// | string | `&str` | `"hi"` | `hi` |
211 /// | bytes | `&[u8]` | `b"hi"[..]` | `hi` |
212 /// | option | `Option` | `None` | *empty* |
213 /// | option | | `Some(5)` | `5` |
214 /// | unit | `()` | `()` | *empty* |
215 /// | unit struct | `struct Foo;` | `Foo` | `Foo` |
216 /// | unit enum variant | `enum E { A, B }` | `E::A` | `A` |
217 /// | newtype struct | `struct Foo(u8);` | `Foo(5)` | `5` |
218 /// | newtype enum variant | `enum E { A(u8) }` | `E::A(5)` | `5` |
219 ///
220 /// Note that this table includes simple structs and enums. For example, to
221 /// serialize a field from either an integer or a float type, one can do
222 /// this:
223 ///
224 /// ```
225 /// use std::error::Error;
226 ///
227 /// use csv_async::AsyncSerializer;
228 /// use serde::Serialize;
229 ///
230 /// #[derive(Serialize)]
231 /// struct Row {
232 /// label: String,
233 /// value: Value,
234 /// }
235 ///
236 /// #[derive(Serialize)]
237 /// enum Value {
238 /// Integer(i64),
239 /// Float(f64),
240 /// }
241 ///
242 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
243 /// async fn example() -> Result<(), Box<dyn Error>> {
244 /// let mut ser = AsyncSerializer::from_writer(vec![]);
245 /// ser.serialize(Row {
246 /// label: "foo".to_string(),
247 /// value: Value::Integer(3),
248 /// }).await?;
249 /// ser.serialize(Row {
250 /// label: "bar".to_string(),
251 /// value: Value::Float(3.14),
252 /// }).await?;
253 ///
254 /// let data = String::from_utf8(ser.into_inner().await?)?;
255 /// assert_eq!(data, indoc::indoc! {"
256 /// label,value
257 /// foo,3
258 /// bar,3.14
259 /// "});
260 /// Ok(())
261 /// }
262 /// ```
263 ///
264 /// ## Non-Struct Containers
265 ///
266 /// Nested containers are flattened to their scalar components, with the
267 /// exception of a few types that are not allowed:
268 ///
269 /// | Name | Example Type | Example Value | Output |
270 /// | ---- | ---- | ---- | ---- |
271 /// | sequence | `Vec<u8>` | `vec![1, 2, 3]` | `1,2,3` |
272 /// | tuple | `(u8, bool)` | `(5, true)` | `5,true` |
273 /// | tuple struct | `Foo(u8, bool)` | `Foo(5, true)` | `5,true` |
274 /// | tuple enum variant | `enum E { A(u8, bool) }` | `E::A(5, true)` | *error* |
275 /// | struct enum variant | `enum E { V { a: u8, b: bool } }` | `E::V { a: 5, b: true }` | *error* |
276 /// | map | `BTreeMap<K, V>` | `BTreeMap::new()` | *error* |
277 ///
278 /// ## Structs
279 ///
280 /// Like the other containers, structs are flattened to their scalar
281 /// components:
282 ///
283 /// | Name | Example Type | Example Value | Output |
284 /// | ---- | ---- | ---- | ---- |
285 /// | struct | `struct Foo { a: u8, b: bool }` | `Foo { a: 5, b: true }` | `5,true` |
286 ///
287 /// If `has_headers` is `false`, then there are no additional restrictions;
288 /// types can be nested arbitrarily. For example:
289 ///
290 /// ```
291 /// use std::error::Error;
292 /// use csv_async::AsyncWriterBuilder;
293 /// use serde::Serialize;
294 ///
295 /// #[derive(Serialize)]
296 /// struct Row {
297 /// label: String,
298 /// values: Vec<f64>,
299 /// }
300 ///
301 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
302 /// async fn example() -> Result<(), Box<dyn Error>> {
303 /// let mut ser = AsyncWriterBuilder::new()
304 /// .has_headers(false)
305 /// .create_serializer(vec![]);
306 /// ser.serialize(Row {
307 /// label: "foo".to_string(),
308 /// values: vec![1.1234, 2.5678, 3.14],
309 /// }).await?;
310 ///
311 /// let data = String::from_utf8(ser.into_inner().await?)?;
312 /// assert_eq!(data, indoc::indoc! {"
313 /// foo,1.1234,2.5678,3.14
314 /// "});
315 /// Ok(())
316 /// }
317 /// ```
318 ///
319 /// However, if `has_headers` were enabled in the above example, then
320 /// serialization would return an error. Specifically, when `has_headers` is
321 /// `true`, there are two restrictions:
322 ///
323 /// 1. Named field values in structs must be scalars.
324 ///
325 /// 2. All scalars must be named field values in structs.
326 ///
327 /// Other than these two restrictions, types can be nested arbitrarily.
328 /// Here are a few examples:
329 ///
330 /// | Value | Header | Record |
331 /// | ---- | ---- | ---- |
332 /// | `(Foo { x: 5, y: 6 }, Bar { z: true })` | `x,y,z` | `5,6,true` |
333 /// | `vec![Foo { x: 5, y: 6 }, Foo { x: 7, y: 8 }]` | `x,y,x,y` | `5,6,7,8` |
334 /// | `(Foo { x: 5, y: 6 }, vec![Bar { z: Baz(true) }])` | `x,y,z` | `5,6,true` |
335 /// | `Foo { x: 5, y: (6, 7) }` | *error: restriction 1* | `5,6,7` |
336 /// | `(5, Foo { x: 6, y: 7 }` | *error: restriction 2* | `5,6,7` |
337 /// | `(Foo { x: 5, y: 6 }, true)` | *error: restriction 2* | `5,6,true` |
338 pub async fn serialize<S: Serialize>(&mut self, record: S) -> Result<()> {
339 self.ser_wtr.serialize(record)?;
340 self.ser_wtr.flush()?;
341 self.asy_wtr.as_mut().unwrap().write_all(self.ser_wtr.data()).await?;
342 self.ser_wtr.clear();
343 Ok(())
344 }
345
346 /// Flushes the underlying asynchronous writer.
347 pub async fn flush(&mut self) -> io::Result<()> {
348 if let Some(ref mut asy_wtr) = self.asy_wtr {
349 asy_wtr.flush().await?;
350 }
351 Ok(())
352 }
353
354 /// Flush the contents of the internal buffer and return the underlying
355 /// writer.
356 pub async fn into_inner(
357 mut self,
358 ) -> result::Result<W, IntoInnerError<AsyncSerializer<W>>> {
359 match self.flush().await {
360 Ok(()) => Ok(self.asy_wtr.take().unwrap()),
361 Err(err) => Err(IntoInnerError::new(self, err)),
362 }
363 }
364}