csv_async/async_writers/awtr_tokio.rs
1use std::result;
2
3use tokio::io::{self, AsyncWrite};
4
5use crate::AsyncWriterBuilder;
6use crate::byte_record::ByteRecord;
7use crate::error::Result;
8use super::AsyncWriterImpl;
9
10impl AsyncWriterBuilder {
11 /// Build a CSV writer from this configuration that writes data to `wtr`.
12 ///
13 /// Note that the CSV writer is buffered automatically, so you should not
14 /// wrap `wtr` in a buffered writer.
15 ///
16 /// # Example
17 ///
18 /// ```
19 /// use std::error::Error;
20 /// use csv_async::AsyncWriterBuilder;
21 ///
22 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
23 /// async fn example() -> Result<(), Box<dyn Error>> {
24 /// let mut wtr = AsyncWriterBuilder::new().create_writer(vec![]);
25 /// wtr.write_record(&["a", "b", "c"]).await?;
26 /// wtr.write_record(&["x", "y", "z"]).await?;
27 ///
28 /// let data = String::from_utf8(wtr.into_inner().await?)?;
29 /// assert_eq!(data, "a,b,c\nx,y,z\n");
30 /// Ok(())
31 /// }
32 /// ```
33 pub fn create_writer<W: AsyncWrite + Unpin>(&self, wtr: W) -> AsyncWriter<W> {
34 AsyncWriter::new(self, wtr)
35 }
36}
37
38/// An already configured CSV writer for `tokio` runtime.
39///
40/// A CSV writer takes as input Rust values and writes those values in a valid
41/// CSV format as output.
42///
43/// While CSV writing is considerably easier than parsing CSV, a proper writer
44/// will do a number of things for you:
45///
46/// 1. Quote fields when necessary.
47/// 2. Check that all records have the same number of fields.
48/// 3. Write records with a single empty field correctly.
49/// 4. Use buffering intelligently and otherwise avoid allocation. (This means
50/// that callers should not do their own buffering.)
51///
52/// All of the above can be configured using a
53/// [`AsyncWriterBuilder`](struct.AsyncWriterBuilder.html).
54/// However, a `AsyncWriter` has convenient constructor (from_writer`)
55/// that use the default configuration.
56///
57/// Note that the default configuration of a `AsyncWriter` uses `\n` for record
58/// terminators instead of `\r\n` as specified by RFC 4180. Use the
59/// `terminator` method on `AsyncWriterBuilder` to set the terminator to `\r\n` if
60/// it's desired.
61#[derive(Debug)]
62pub struct AsyncWriter<W: AsyncWrite + Unpin>(AsyncWriterImpl<W>);
63
64impl<W: AsyncWrite + Unpin> AsyncWriter<W> {
65 fn new(builder: &AsyncWriterBuilder, wtr: W) -> AsyncWriter<W> {
66 AsyncWriter(AsyncWriterImpl::new(builder, wtr))
67 }
68
69 /// Build a CSV writer with a default configuration that writes data to
70 /// `wtr`.
71 ///
72 /// Note that the CSV writer is buffered automatically, so you should not
73 /// wrap `wtr` in a buffered writer.
74 ///
75 /// # Example
76 ///
77 /// ```
78 /// use std::error::Error;
79 /// use csv_async::AsyncWriter;
80 ///
81 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
82 /// async fn example() -> Result<(), Box<dyn Error>> {
83 /// let mut wtr = AsyncWriter::from_writer(vec![]);
84 /// wtr.write_record(&["a", "b", "c"]).await?;
85 /// wtr.write_record(&["x", "y", "z"]).await?;
86 ///
87 /// let data = String::from_utf8(wtr.into_inner().await?)?;
88 /// assert_eq!(data, "a,b,c\nx,y,z\n");
89 /// Ok(())
90 /// }
91 /// ```
92 pub fn from_writer(wtr: W) -> AsyncWriter<W> {
93 AsyncWriterBuilder::new().create_writer(wtr)
94 }
95
96 /// Write a single record.
97 ///
98 /// This method accepts something that can be turned into an iterator that
99 /// yields elements that can be represented by a `&[u8]`.
100 ///
101 /// This may be called with an empty iterator, which will cause a record
102 /// terminator to be written. If no fields had been written, then a single
103 /// empty field is written before the terminator.
104 ///
105 /// # Example
106 ///
107 /// ```
108 /// use std::error::Error;
109 /// use csv_async::AsyncWriter;
110 ///
111 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
112 /// async fn example() -> Result<(), Box<dyn Error>> {
113 /// let mut wtr = AsyncWriter::from_writer(vec![]);
114 /// wtr.write_record(&["a", "b", "c"]).await?;
115 /// wtr.write_record(&["x", "y", "z"]).await?;
116 ///
117 /// let data = String::from_utf8(wtr.into_inner().await?)?;
118 /// assert_eq!(data, "a,b,c\nx,y,z\n");
119 /// Ok(())
120 /// }
121 /// ```
122 #[inline]
123 pub async fn write_record<I, T>(&mut self, record: I) -> Result<()>
124 where
125 I: IntoIterator<Item = T>,
126 T: AsRef<[u8]>,
127 {
128 self.0.write_record(record).await
129 }
130
131 /// Write a single `ByteRecord`.
132 ///
133 /// This method accepts a borrowed `ByteRecord` and writes its contents
134 /// to the underlying writer.
135 ///
136 /// This is similar to `write_record` except that it specifically requires
137 /// a `ByteRecord`. This permits the writer to possibly write the record
138 /// more quickly than the more generic `write_record`.
139 ///
140 /// This may be called with an empty record, which will cause a record
141 /// terminator to be written. If no fields had been written, then a single
142 /// empty field is written before the terminator.
143 ///
144 /// # Example
145 ///
146 /// ```
147 /// use std::error::Error;
148 /// use csv_async::{ByteRecord, AsyncWriter};
149 ///
150 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
151 /// async fn example() -> Result<(), Box<dyn Error>> {
152 /// let mut wtr = AsyncWriter::from_writer(vec![]);
153 /// wtr.write_byte_record(&ByteRecord::from(&["a", "b", "c"][..])).await?;
154 /// wtr.write_byte_record(&ByteRecord::from(&["x", "y", "z"][..])).await?;
155 ///
156 /// let data = String::from_utf8(wtr.into_inner().await?)?;
157 /// assert_eq!(data, "a,b,c\nx,y,z\n");
158 /// Ok(())
159 /// }
160 /// ```
161 #[inline]
162 pub async fn write_byte_record(&mut self, record: &ByteRecord) -> Result<()> {
163 self.0.write_byte_record(record).await
164 }
165
166 /// Write a single field.
167 ///
168 /// One should prefer using `write_record` over this method. It is provided
169 /// for cases where writing a field at a time is more convenient than
170 /// writing a record at a time.
171 ///
172 /// Note that if this API is used, `write_record` should be called with an
173 /// empty iterator to write a record terminator.
174 ///
175 /// # Example
176 ///
177 /// ```
178 /// use std::error::Error;
179 /// use csv_async::AsyncWriter;
180 ///
181 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
182 /// async fn example() -> Result<(), Box<dyn Error>> {
183 /// let mut wtr = AsyncWriter::from_writer(vec![]);
184 /// wtr.write_field("a").await?;
185 /// wtr.write_field("b").await?;
186 /// wtr.write_field("c").await?;
187 /// wtr.write_record(None::<&[u8]>).await?;
188 /// wtr.write_field("x").await?;
189 /// wtr.write_field("y").await?;
190 /// wtr.write_field("z").await?;
191 /// wtr.write_record(None::<&[u8]>).await?;
192 ///
193 /// let data = String::from_utf8(wtr.into_inner().await?)?;
194 /// assert_eq!(data, "a,b,c\nx,y,z\n");
195 /// Ok(())
196 /// }
197 /// ```
198 #[inline]
199 pub async fn write_field<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
200 self.0.write_field(field).await
201 }
202
203 /// Flush the contents of the internal buffer to the underlying writer.
204 ///
205 /// If there was a problem writing to the underlying writer, then an error
206 /// is returned.
207 ///
208 /// This finction is also called by writer destructor.
209 #[inline]
210 pub async fn flush(&mut self) -> io::Result<()> {
211 self.0.flush().await
212 }
213
214 /// Flush the contents of the internal buffer and return the underlying writer.
215 ///
216 pub async fn into_inner(
217 self,
218 ) -> result::Result<W, io::Error> {
219 match self.0.into_inner().await {
220 Ok(w) => Ok(w),
221 Err(err) => Err(err.into_error()),
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use std::pin::Pin;
229 use std::task::{Context, Poll};
230
231 use tokio::io;
232
233 use crate::byte_record::ByteRecord;
234 use crate::error::ErrorKind;
235 use crate::string_record::StringRecord;
236
237 use super::{AsyncWriter, AsyncWriterBuilder};
238
239 async fn wtr_as_string<'w>(wtr: AsyncWriter<Vec<u8>>) -> String {
240 String::from_utf8(wtr.into_inner().await.unwrap()).unwrap()
241 }
242
243 #[tokio::test]
244 async fn one_record() {
245 let mut wtr = AsyncWriter::from_writer(vec![]);
246 wtr.write_record(&["a", "b", "c"]).await.unwrap();
247
248 assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
249 }
250
251 #[tokio::test]
252 async fn one_string_record() {
253 let mut wtr = AsyncWriter::from_writer(vec![]);
254 wtr.write_record(&StringRecord::from(vec!["a", "b", "c"])).await.unwrap();
255
256 assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
257 }
258
259 #[tokio::test]
260 async fn one_byte_record() {
261 let mut wtr = AsyncWriter::from_writer(vec![]);
262 wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
263
264 assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
265 }
266
267 #[tokio::test]
268 async fn raw_one_byte_record() {
269 let mut wtr = AsyncWriter::from_writer(vec![]);
270 wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
271
272 assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
273 }
274
275 #[tokio::test]
276 async fn one_empty_record() {
277 let mut wtr = AsyncWriter::from_writer(vec![]);
278 wtr.write_record(&[""]).await.unwrap();
279
280 assert_eq!(wtr_as_string(wtr).await, "\"\"\n");
281 }
282
283 #[tokio::test]
284 async fn raw_one_empty_record() {
285 let mut wtr = AsyncWriter::from_writer(vec![]);
286 wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
287
288 assert_eq!(wtr_as_string(wtr).await, "\"\"\n");
289 }
290
291 #[tokio::test]
292 async fn two_empty_records() {
293 let mut wtr = AsyncWriter::from_writer(vec![]);
294 wtr.write_record(&[""]).await.unwrap();
295 wtr.write_record(&[""]).await.unwrap();
296
297 assert_eq!(wtr_as_string(wtr).await, "\"\"\n\"\"\n");
298 }
299
300 #[tokio::test]
301 async fn raw_two_empty_records() {
302 let mut wtr = AsyncWriter::from_writer(vec![]);
303 wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
304 wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
305
306 assert_eq!(wtr_as_string(wtr).await, "\"\"\n\"\"\n");
307 }
308
309 #[tokio::test]
310 async fn unequal_records_bad() {
311 let mut wtr = AsyncWriter::from_writer(vec![]);
312 wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
313 let err = wtr.write_record(&ByteRecord::from(vec!["a"])).await.unwrap_err();
314 match *err.kind() {
315 ErrorKind::UnequalLengths { ref pos, expected_len, len } => {
316 assert!(pos.is_none());
317 assert_eq!(expected_len, 3);
318 assert_eq!(len, 1);
319 }
320 ref x => {
321 panic!("expected UnequalLengths error, but got '{:?}'", x);
322 }
323 }
324 }
325
326 #[tokio::test]
327 async fn raw_unequal_records_bad() {
328 let mut wtr = AsyncWriter::from_writer(vec![]);
329 wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
330 let err =
331 wtr.write_byte_record(&ByteRecord::from(vec!["a"])).await.unwrap_err();
332 match *err.kind() {
333 ErrorKind::UnequalLengths { ref pos, expected_len, len } => {
334 assert!(pos.is_none());
335 assert_eq!(expected_len, 3);
336 assert_eq!(len, 1);
337 }
338 ref x => {
339 panic!("expected UnequalLengths error, but got '{:?}'", x);
340 }
341 }
342 }
343
344 #[tokio::test]
345 async fn unequal_records_ok() {
346 let mut wtr = AsyncWriterBuilder::new().flexible(true).create_writer(vec![]);
347 wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
348 wtr.write_record(&ByteRecord::from(vec!["a"])).await.unwrap();
349 assert_eq!(wtr_as_string(wtr).await, "a,b,c\na\n");
350 }
351
352 #[tokio::test]
353 async fn raw_unequal_records_ok() {
354 let mut wtr = AsyncWriterBuilder::new().flexible(true).create_writer(vec![]);
355 wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
356 wtr.write_byte_record(&ByteRecord::from(vec!["a"])).await.unwrap();
357 assert_eq!(wtr_as_string(wtr).await, "a,b,c\na\n");
358 }
359
360 #[tokio::test]
361 async fn full_buffer_should_not_flush_underlying() {
362 #[derive(Debug)]
363 struct MarkWriteAndFlush(Vec<u8>);
364
365 impl MarkWriteAndFlush {
366 fn to_str(self) -> String {
367 String::from_utf8(self.0).unwrap()
368 }
369 }
370
371 impl io::AsyncWrite for MarkWriteAndFlush {
372 fn poll_write(
373 mut self: Pin<&mut Self>,
374 _: &mut Context,
375 buf: &[u8]
376 ) -> Poll<Result<usize, io::Error>> {
377 use std::io::Write;
378 self.0.write(b">").unwrap();
379 let written = self.0.write(buf).unwrap();
380 assert_eq!(written, buf.len());
381 self.0.write(b"<").unwrap();
382 // AsyncWriteExt::write_all panics if write returns more than buf.len()
383 // Poll::Ready(Ok(written + 2))
384 Poll::Ready(Ok(written))
385 }
386
387 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), io::Error>> {
388 use std::io::Write;
389 self.0.write(b"!").unwrap();
390 Poll::Ready(Ok(()))
391 }
392
393 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
394 self.poll_flush(cx)
395 }
396 }
397
398 let underlying = MarkWriteAndFlush(vec![]);
399 let mut wtr =
400 AsyncWriterBuilder::new().buffer_capacity(4).create_writer(underlying);
401
402 wtr.write_byte_record(&ByteRecord::from(vec!["a", "b"])).await.unwrap();
403 wtr.write_byte_record(&ByteRecord::from(vec!["c", "d"])).await.unwrap();
404 wtr.flush().await.unwrap();
405 wtr.write_byte_record(&ByteRecord::from(vec!["e", "f"])).await.unwrap();
406
407 let got = wtr.into_inner().await.unwrap().to_str();
408
409 // As the buffer size is 4 we should write each record separately, and
410 // flush when explicitly called and implictly in into_inner.
411 assert_eq!(got, ">a,b\n<>c,d\n<!>e,f\n<!");
412 }
413}