csv_async/async_writers/
awtr_tokio.rs

1use std::result;
2
3use tokio::io::{self, AsyncWrite};
4
5use crate::AsyncWriterBuilder;
6use crate::byte_record::ByteRecord;
7use crate::error::Result;
8use super::AsyncWriterImpl;
9
10impl AsyncWriterBuilder {
11    /// Build a CSV writer from this configuration that writes data to `wtr`.
12    ///
13    /// Note that the CSV writer is buffered automatically, so you should not
14    /// wrap `wtr` in a buffered writer.
15    ///
16    /// # Example
17    ///
18    /// ```
19    /// use std::error::Error;
20    /// use csv_async::AsyncWriterBuilder;
21    ///
22    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
23    /// async fn example() -> Result<(), Box<dyn Error>> {
24    ///     let mut wtr = AsyncWriterBuilder::new().create_writer(vec![]);
25    ///     wtr.write_record(&["a", "b", "c"]).await?;
26    ///     wtr.write_record(&["x", "y", "z"]).await?;
27    ///
28    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
29    ///     assert_eq!(data, "a,b,c\nx,y,z\n");
30    ///     Ok(())
31    /// }
32    /// ```
33    pub fn create_writer<W: AsyncWrite + Unpin>(&self, wtr: W) -> AsyncWriter<W> {
34        AsyncWriter::new(self, wtr)
35    }
36}
37
38/// An already configured CSV writer for `tokio` runtime.
39///
40/// A CSV writer takes as input Rust values and writes those values in a valid
41/// CSV format as output.
42///
43/// While CSV writing is considerably easier than parsing CSV, a proper writer
44/// will do a number of things for you:
45///
46/// 1. Quote fields when necessary.
47/// 2. Check that all records have the same number of fields.
48/// 3. Write records with a single empty field correctly.
49/// 4. Use buffering intelligently and otherwise avoid allocation. (This means
50///    that callers should not do their own buffering.)
51///
52/// All of the above can be configured using a
53/// [`AsyncWriterBuilder`](struct.AsyncWriterBuilder.html).
54/// However, a `AsyncWriter` has convenient constructor (from_writer`) 
55/// that use the default configuration.
56///
57/// Note that the default configuration of a `AsyncWriter` uses `\n` for record
58/// terminators instead of `\r\n` as specified by RFC 4180. Use the
59/// `terminator` method on `AsyncWriterBuilder` to set the terminator to `\r\n` if
60/// it's desired.
61#[derive(Debug)]
62pub struct AsyncWriter<W: AsyncWrite + Unpin>(AsyncWriterImpl<W>);
63
64impl<W: AsyncWrite + Unpin> AsyncWriter<W> {
65    fn new(builder: &AsyncWriterBuilder, wtr: W) -> AsyncWriter<W> {
66        AsyncWriter(AsyncWriterImpl::new(builder, wtr))
67    }
68
69    /// Build a CSV writer with a default configuration that writes data to
70    /// `wtr`.
71    ///
72    /// Note that the CSV writer is buffered automatically, so you should not
73    /// wrap `wtr` in a buffered writer.
74    ///
75    /// # Example
76    ///
77    /// ```
78    /// use std::error::Error;
79    /// use csv_async::AsyncWriter;
80    ///
81    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
82    /// async fn example() -> Result<(), Box<dyn Error>> {
83    ///     let mut wtr = AsyncWriter::from_writer(vec![]);
84    ///     wtr.write_record(&["a", "b", "c"]).await?;
85    ///     wtr.write_record(&["x", "y", "z"]).await?;
86    ///
87    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
88    ///     assert_eq!(data, "a,b,c\nx,y,z\n");
89    ///     Ok(())
90    /// }
91    /// ```
92    pub fn from_writer(wtr: W) -> AsyncWriter<W> {
93        AsyncWriterBuilder::new().create_writer(wtr)
94    }
95
96    /// Write a single record.
97    ///
98    /// This method accepts something that can be turned into an iterator that
99    /// yields elements that can be represented by a `&[u8]`.
100    ///
101    /// This may be called with an empty iterator, which will cause a record
102    /// terminator to be written. If no fields had been written, then a single
103    /// empty field is written before the terminator.
104    ///
105    /// # Example
106    ///
107    /// ```
108    /// use std::error::Error;
109    /// use csv_async::AsyncWriter;
110    ///
111    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
112    /// async fn example() -> Result<(), Box<dyn Error>> {
113    ///     let mut wtr = AsyncWriter::from_writer(vec![]);
114    ///     wtr.write_record(&["a", "b", "c"]).await?;
115    ///     wtr.write_record(&["x", "y", "z"]).await?;
116    ///
117    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
118    ///     assert_eq!(data, "a,b,c\nx,y,z\n");
119    ///     Ok(())
120    /// }
121    /// ```
122    #[inline]
123    pub async fn write_record<I, T>(&mut self, record: I) -> Result<()>
124    where
125        I: IntoIterator<Item = T>,
126        T: AsRef<[u8]>,
127    {
128        self.0.write_record(record).await
129    }
130
131    /// Write a single `ByteRecord`.
132    ///
133    /// This method accepts a borrowed `ByteRecord` and writes its contents
134    /// to the underlying writer.
135    ///
136    /// This is similar to `write_record` except that it specifically requires
137    /// a `ByteRecord`. This permits the writer to possibly write the record
138    /// more quickly than the more generic `write_record`.
139    ///
140    /// This may be called with an empty record, which will cause a record
141    /// terminator to be written. If no fields had been written, then a single
142    /// empty field is written before the terminator.
143    ///
144    /// # Example
145    ///
146    /// ```
147    /// use std::error::Error;
148    /// use csv_async::{ByteRecord, AsyncWriter};
149    ///
150    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
151    /// async fn example() -> Result<(), Box<dyn Error>> {
152    ///     let mut wtr = AsyncWriter::from_writer(vec![]);
153    ///     wtr.write_byte_record(&ByteRecord::from(&["a", "b", "c"][..])).await?;
154    ///     wtr.write_byte_record(&ByteRecord::from(&["x", "y", "z"][..])).await?;
155    ///
156    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
157    ///     assert_eq!(data, "a,b,c\nx,y,z\n");
158    ///     Ok(())
159    /// }
160    /// ```
161    #[inline]
162    pub async fn write_byte_record(&mut self, record: &ByteRecord) -> Result<()> {
163        self.0.write_byte_record(record).await
164    }
165
166    /// Write a single field.
167    ///
168    /// One should prefer using `write_record` over this method. It is provided
169    /// for cases where writing a field at a time is more convenient than
170    /// writing a record at a time.
171    ///
172    /// Note that if this API is used, `write_record` should be called with an
173    /// empty iterator to write a record terminator.
174    ///
175    /// # Example
176    ///
177    /// ```
178    /// use std::error::Error;
179    /// use csv_async::AsyncWriter;
180    ///
181    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
182    /// async fn example() -> Result<(), Box<dyn Error>> {
183    ///     let mut wtr = AsyncWriter::from_writer(vec![]);
184    ///     wtr.write_field("a").await?;
185    ///     wtr.write_field("b").await?;
186    ///     wtr.write_field("c").await?;
187    ///     wtr.write_record(None::<&[u8]>).await?;
188    ///     wtr.write_field("x").await?;
189    ///     wtr.write_field("y").await?;
190    ///     wtr.write_field("z").await?;
191    ///     wtr.write_record(None::<&[u8]>).await?;
192    ///
193    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
194    ///     assert_eq!(data, "a,b,c\nx,y,z\n");
195    ///     Ok(())
196    /// }
197    /// ```
198    #[inline]
199    pub async fn write_field<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
200        self.0.write_field(field).await
201    }
202
203    /// Flush the contents of the internal buffer to the underlying writer.
204    ///
205    /// If there was a problem writing to the underlying writer, then an error
206    /// is returned.
207    ///
208    /// This finction is also called by writer destructor.
209    #[inline]
210    pub async fn flush(&mut self) -> io::Result<()> {
211        self.0.flush().await
212    }
213
214    /// Flush the contents of the internal buffer and return the underlying writer.
215    /// 
216    pub async fn into_inner(
217        self,
218    ) -> result::Result<W, io::Error> {
219        match self.0.into_inner().await {
220            Ok(w) => Ok(w),
221            Err(err) => Err(err.into_error()),
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use std::pin::Pin;
229    use std::task::{Context, Poll};
230    
231    use tokio::io;
232
233    use crate::byte_record::ByteRecord;
234    use crate::error::ErrorKind;
235    use crate::string_record::StringRecord;
236
237    use super::{AsyncWriter, AsyncWriterBuilder};
238
239    async fn wtr_as_string<'w>(wtr: AsyncWriter<Vec<u8>>) -> String {
240        String::from_utf8(wtr.into_inner().await.unwrap()).unwrap()
241    }
242
243    #[tokio::test]
244    async fn one_record() {
245        let mut wtr = AsyncWriter::from_writer(vec![]);
246        wtr.write_record(&["a", "b", "c"]).await.unwrap();
247
248        assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
249    }
250
251    #[tokio::test]
252    async fn one_string_record() {
253        let mut wtr = AsyncWriter::from_writer(vec![]);
254        wtr.write_record(&StringRecord::from(vec!["a", "b", "c"])).await.unwrap();
255
256        assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
257    }
258
259    #[tokio::test]
260    async fn one_byte_record() {
261        let mut wtr = AsyncWriter::from_writer(vec![]);
262        wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
263
264        assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
265    }
266
267    #[tokio::test]
268    async fn raw_one_byte_record() {
269        let mut wtr = AsyncWriter::from_writer(vec![]);
270        wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
271
272        assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
273    }
274
275    #[tokio::test]
276    async fn one_empty_record() {
277        let mut wtr = AsyncWriter::from_writer(vec![]);
278        wtr.write_record(&[""]).await.unwrap();
279
280        assert_eq!(wtr_as_string(wtr).await, "\"\"\n");
281    }
282
283    #[tokio::test]
284    async fn raw_one_empty_record() {
285        let mut wtr = AsyncWriter::from_writer(vec![]);
286        wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
287
288        assert_eq!(wtr_as_string(wtr).await, "\"\"\n");
289    }
290
291    #[tokio::test]
292    async fn two_empty_records() {
293        let mut wtr = AsyncWriter::from_writer(vec![]);
294        wtr.write_record(&[""]).await.unwrap();
295        wtr.write_record(&[""]).await.unwrap();
296
297        assert_eq!(wtr_as_string(wtr).await, "\"\"\n\"\"\n");
298    }
299
300    #[tokio::test]
301    async fn raw_two_empty_records() {
302        let mut wtr = AsyncWriter::from_writer(vec![]);
303        wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
304        wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
305
306        assert_eq!(wtr_as_string(wtr).await, "\"\"\n\"\"\n");
307    }
308
309    #[tokio::test]
310    async fn unequal_records_bad() {
311        let mut wtr = AsyncWriter::from_writer(vec![]);
312        wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
313        let err = wtr.write_record(&ByteRecord::from(vec!["a"])).await.unwrap_err();
314        match *err.kind() {
315            ErrorKind::UnequalLengths { ref pos, expected_len, len } => {
316                assert!(pos.is_none());
317                assert_eq!(expected_len, 3);
318                assert_eq!(len, 1);
319            }
320            ref x => {
321                panic!("expected UnequalLengths error, but got '{:?}'", x);
322            }
323        }
324    }
325
326    #[tokio::test]
327    async fn raw_unequal_records_bad() {
328        let mut wtr = AsyncWriter::from_writer(vec![]);
329        wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
330        let err =
331            wtr.write_byte_record(&ByteRecord::from(vec!["a"])).await.unwrap_err();
332        match *err.kind() {
333            ErrorKind::UnequalLengths { ref pos, expected_len, len } => {
334                assert!(pos.is_none());
335                assert_eq!(expected_len, 3);
336                assert_eq!(len, 1);
337            }
338            ref x => {
339                panic!("expected UnequalLengths error, but got '{:?}'", x);
340            }
341        }
342    }
343
344    #[tokio::test]
345    async fn unequal_records_ok() {
346        let mut wtr = AsyncWriterBuilder::new().flexible(true).create_writer(vec![]);
347        wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
348        wtr.write_record(&ByteRecord::from(vec!["a"])).await.unwrap();
349        assert_eq!(wtr_as_string(wtr).await, "a,b,c\na\n");
350    }
351
352    #[tokio::test]
353    async fn raw_unequal_records_ok() {
354        let mut wtr = AsyncWriterBuilder::new().flexible(true).create_writer(vec![]);
355        wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
356        wtr.write_byte_record(&ByteRecord::from(vec!["a"])).await.unwrap();
357        assert_eq!(wtr_as_string(wtr).await, "a,b,c\na\n");
358    }
359
360    #[tokio::test]
361    async fn full_buffer_should_not_flush_underlying() {
362        #[derive(Debug)]
363        struct MarkWriteAndFlush(Vec<u8>);
364
365        impl MarkWriteAndFlush {
366            fn to_str(self) -> String {
367                String::from_utf8(self.0).unwrap()
368            }
369        }
370
371        impl io::AsyncWrite for MarkWriteAndFlush {
372            fn poll_write(
373                mut self: Pin<&mut Self>,
374                _: &mut Context,
375                buf: &[u8]
376            ) -> Poll<Result<usize, io::Error>> {
377                use std::io::Write;
378                self.0.write(b">").unwrap();
379                let written = self.0.write(buf).unwrap();
380                assert_eq!(written, buf.len());
381                self.0.write(b"<").unwrap();
382                // AsyncWriteExt::write_all panics if write returns more than buf.len()
383                // Poll::Ready(Ok(written + 2))
384                Poll::Ready(Ok(written))
385            }
386
387            fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), io::Error>> {
388                use std::io::Write;
389                self.0.write(b"!").unwrap();
390                Poll::Ready(Ok(()))
391            }
392
393            fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
394                self.poll_flush(cx)
395            }
396        }
397
398        let underlying = MarkWriteAndFlush(vec![]);
399        let mut wtr =
400            AsyncWriterBuilder::new().buffer_capacity(4).create_writer(underlying);
401
402        wtr.write_byte_record(&ByteRecord::from(vec!["a", "b"])).await.unwrap();
403        wtr.write_byte_record(&ByteRecord::from(vec!["c", "d"])).await.unwrap();
404        wtr.flush().await.unwrap();
405        wtr.write_byte_record(&ByteRecord::from(vec!["e", "f"])).await.unwrap();
406
407        let got = wtr.into_inner().await.unwrap().to_str();
408
409        // As the buffer size is 4 we should write each record separately, and
410        // flush when explicitly called and implictly in into_inner.
411        assert_eq!(got, ">a,b\n<>c,d\n<!>e,f\n<!");
412    }
413}