csv_async/async_writers/mod.rs
1use std::result;
2
3use csv_core::WriterBuilder as CoreWriterBuilder;
4use csv_core::{self, WriteResult, Writer as CoreWriter};
5cfg_if::cfg_if! {
6if #[cfg(feature = "tokio")] {
7 use tokio::io::{self, AsyncWrite, AsyncWriteExt};
8} else {
9 use futures::io::{self, AsyncWrite, AsyncWriteExt};
10}}
11
12
13use crate::{QuoteStyle, Terminator};
14use crate::byte_record::ByteRecord;
15use crate::error::{Error, ErrorKind, IntoInnerError, Result};
16
17#[cfg(feature = "with_serde")]
18pub mod mwtr_serde;
19
20cfg_if::cfg_if! {
21if #[cfg(feature = "tokio")] {
22 pub mod awtr_tokio;
23} else {
24 pub mod awtr_futures;
25}}
26
27#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
28pub mod aser_futures;
29
30#[cfg(all(feature = "with_serde", feature = "tokio"))]
31pub mod aser_tokio;
32
33//-//////////////////////////////////////////////////////////////////////////////////////////////
34//-// Builder
35//-//////////////////////////////////////////////////////////////////////////////////////////////
36
37/// Builds a CSV writer with various configuration knobs.
38///
39/// This builder can be used to tweak the field delimiter, record terminator
40/// and more. Once a CSV `AsyncWriter` is built, its configuration cannot be
41/// changed.
42#[derive(Debug)]
43pub struct AsyncWriterBuilder {
44 builder: CoreWriterBuilder,
45 capacity: usize,
46 flexible: bool,
47 has_headers: bool,
48}
49
50impl Default for AsyncWriterBuilder {
51 fn default() -> AsyncWriterBuilder {
52 AsyncWriterBuilder {
53 builder: CoreWriterBuilder::default(),
54 capacity: 8 * (1 << 10),
55 flexible: false,
56 has_headers: true,
57 }
58 }
59}
60
61impl AsyncWriterBuilder {
62 /// Create a new builder for configuring CSV writing.
63 ///
64 /// To convert a builder into a writer, call one of the methods starting
65 /// with `from_`.
66 ///
67 /// # Example
68 ///
69 /// ```
70 /// use std::error::Error;
71 /// use csv_async::AsyncWriterBuilder;
72 ///
73 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
74 /// async fn example() -> Result<(), Box<dyn Error>> {
75 /// let mut wtr = AsyncWriterBuilder::new().create_writer(vec![]);
76 /// wtr.write_record(&["a", "b", "c"]).await?;
77 /// wtr.write_record(&["x", "y", "z"]).await?;
78 ///
79 /// let data = String::from_utf8(wtr.into_inner().await?)?;
80 /// assert_eq!(data, "a,b,c\nx,y,z\n");
81 /// Ok(())
82 /// }
83 /// ```
84 pub fn new() -> AsyncWriterBuilder {
85 AsyncWriterBuilder::default()
86 }
87
88 /// The field delimiter to use when writing CSV.
89 ///
90 /// The default is `b','`.
91 ///
92 /// # Example
93 ///
94 /// ```
95 /// use std::error::Error;
96 /// use csv_async::AsyncWriterBuilder;
97 ///
98 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
99 /// async fn example() -> Result<(), Box<dyn Error>> {
100 /// let mut wtr = AsyncWriterBuilder::new()
101 /// .delimiter(b';')
102 /// .create_writer(vec![]);
103 /// wtr.write_record(&["a", "b", "c"]).await?;
104 /// wtr.write_record(&["x", "y", "z"]).await?;
105 ///
106 /// let data = String::from_utf8(wtr.into_inner().await?)?;
107 /// assert_eq!(data, "a;b;c\nx;y;z\n");
108 /// Ok(())
109 /// }
110 /// ```
111 pub fn delimiter(&mut self, delimiter: u8) -> &mut AsyncWriterBuilder {
112 self.builder.delimiter(delimiter);
113 self
114 }
115 /// Whether to write a header row before writing any other row.
116 ///
117 /// When this is enabled and the `serialize` method is used to write data
118 /// with something that contains field names (i.e., a struct), then a
119 /// header row is written containing the field names before any other row
120 /// is written.
121 ///
122 /// This option has no effect when using other methods to write rows. That
123 /// is, if you don't use `serialize`, then you must write your header row
124 /// explicitly if you want a header row.
125 ///
126 /// This is enabled by default.
127 ///
128 // / # Example: with headers
129 // /
130 // / This shows how the header will be automatically written from the field
131 // / names of a struct.
132 // /
133 // / ```
134 // / use std::error::Error;
135 // /
136 // / use csv::WriterBuilder;
137 // / use serde::Serialize;
138 // /
139 // / #[derive(Serialize)]
140 // / struct Row<'a> {
141 // / city: &'a str,
142 // / country: &'a str,
143 // / // Serde allows us to name our headers exactly,
144 // / // even if they don't match our struct field names.
145 // / #[serde(rename = "popcount")]
146 // / population: u64,
147 // / }
148 // /
149 // / # fn main() { example().unwrap(); }
150 // / fn example() -> Result<(), Box<dyn Error>> {
151 // / let mut wtr = WriterBuilder::new().from_writer(vec![]);
152 // / wtr.serialize(Row {
153 // / city: "Boston",
154 // / country: "United States",
155 // / population: 4628910,
156 // / })?;
157 // / wtr.serialize(Row {
158 // / city: "Concord",
159 // / country: "United States",
160 // / population: 42695,
161 // / })?;
162 // /
163 // / let data = String::from_utf8(wtr.into_inner()?)?;
164 // / assert_eq!(data, "\
165 // / city,country,popcount
166 // / Boston,United States,4628910
167 // / Concord,United States,42695
168 // / ");
169 // / Ok(())
170 // / }
171 // / ```
172 // /
173 // / # Example: without headers
174 // /
175 // / This shows that serializing things that aren't structs (in this case,
176 // / a tuple struct) won't result in a header row being written. This means
177 // / you usually don't need to set `has_headers(false)` unless you
178 // / explicitly want to both write custom headers and serialize structs.
179 // /
180 // / ```
181 // / use std::error::Error;
182 // / use csv::WriterBuilder;
183 // /
184 // / # fn main() { example().unwrap(); }
185 // / fn example() -> Result<(), Box<dyn Error>> {
186 // / let mut wtr = WriterBuilder::new().from_writer(vec![]);
187 // / wtr.serialize(("Boston", "United States", 4628910))?;
188 // / wtr.serialize(("Concord", "United States", 42695))?;
189 // /
190 // / let data = String::from_utf8(wtr.into_inner()?)?;
191 // / assert_eq!(data, "\
192 // / Boston,United States,4628910
193 // / Concord,United States,42695
194 // / ");
195 // / Ok(())
196 // / }
197 // / ```
198 pub fn has_headers(&mut self, yes: bool) -> &mut AsyncWriterBuilder {
199 self.has_headers = yes;
200 self
201 }
202
203 /// Whether the number of fields in records is allowed to change or not.
204 ///
205 /// When disabled (which is the default), writing CSV data will return an
206 /// error if a record is written with a number of fields different from the
207 /// number of fields written in a previous record.
208 ///
209 /// When enabled, this error checking is turned off.
210 ///
211 /// # Example: writing flexible records
212 ///
213 /// ```
214 /// use std::error::Error;
215 /// use csv_async::AsyncWriterBuilder;
216 ///
217 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
218 /// async fn example() -> Result<(), Box<dyn Error>> {
219 /// let mut wtr = AsyncWriterBuilder::new()
220 /// .flexible(true)
221 /// .create_writer(vec![]);
222 /// wtr.write_record(&["a", "b"]).await?;
223 /// wtr.write_record(&["x", "y", "z"]).await?;
224 ///
225 /// let data = String::from_utf8(wtr.into_inner().await?)?;
226 /// assert_eq!(data, "a,b\nx,y,z\n");
227 /// Ok(())
228 /// }
229 /// ```
230 ///
231 /// # Example: error when `flexible` is disabled
232 ///
233 /// ```
234 /// use std::error::Error;
235 /// use csv_async::AsyncWriterBuilder;
236 ///
237 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
238 /// async fn example() -> Result<(), Box<dyn Error>> {
239 /// let mut wtr = AsyncWriterBuilder::new()
240 /// .flexible(false)
241 /// .create_writer(vec![]);
242 /// wtr.write_record(&["a", "b"]).await?;
243 /// let err = wtr.write_record(&["x", "y", "z"]).await.unwrap_err();
244 /// match *err.kind() {
245 /// csv_async::ErrorKind::UnequalLengths { expected_len, len, .. } => {
246 /// assert_eq!(expected_len, 2);
247 /// assert_eq!(len, 3);
248 /// }
249 /// ref wrong => {
250 /// panic!("expected UnequalLengths but got {:?}", wrong);
251 /// }
252 /// }
253 /// Ok(())
254 /// }
255 /// ```
256 pub fn flexible(&mut self, yes: bool) -> &mut AsyncWriterBuilder {
257 self.flexible = yes;
258 self
259 }
260
261 /// The record terminator to use when writing CSV.
262 ///
263 /// A record terminator can be any single byte. The default is `\n`.
264 ///
265 /// Note that RFC 4180 specifies that record terminators should be `\r\n`.
266 /// To use `\r\n`, use the special `Terminator::CRLF` value.
267 ///
268 /// # Example: CRLF
269 ///
270 /// This shows how to use RFC 4180 compliant record terminators.
271 ///
272 /// ```
273 /// use std::error::Error;
274 /// use csv_async::{Terminator, AsyncWriterBuilder};
275 ///
276 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
277 /// async fn example() -> Result<(), Box<dyn Error>> {
278 /// let mut wtr = AsyncWriterBuilder::new()
279 /// .terminator(Terminator::CRLF)
280 /// .create_writer(vec![]);
281 /// wtr.write_record(&["a", "b", "c"]).await?;
282 /// wtr.write_record(&["x", "y", "z"]).await?;
283 ///
284 /// let data = String::from_utf8(wtr.into_inner().await?)?;
285 /// assert_eq!(data, "a,b,c\r\nx,y,z\r\n");
286 /// Ok(())
287 /// }
288 /// ```
289 pub fn terminator(&mut self, term: Terminator) -> &mut AsyncWriterBuilder {
290 self.builder.terminator(term.to_core());
291 self
292 }
293
294 /// The quoting style to use when writing CSV.
295 ///
296 /// By default, this is set to `QuoteStyle::Necessary`, which will only
297 /// use quotes when they are necessary to preserve the integrity of data.
298 ///
299 /// Note that unless the quote style is set to `Never`, an empty field is
300 /// quoted if it is the only field in a record.
301 ///
302 /// # Example: non-numeric quoting
303 ///
304 /// This shows how to quote non-numeric fields only.
305 ///
306 /// ```
307 /// use std::error::Error;
308 /// use csv_async::{QuoteStyle, AsyncWriterBuilder};
309 ///
310 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
311 /// async fn example() -> Result<(), Box<dyn Error>> {
312 /// let mut wtr = AsyncWriterBuilder::new()
313 /// .quote_style(QuoteStyle::NonNumeric)
314 /// .create_writer(vec![]);
315 /// wtr.write_record(&["a", "5", "c"]).await?;
316 /// wtr.write_record(&["3.14", "y", "z"]).await?;
317 ///
318 /// let data = String::from_utf8(wtr.into_inner().await?)?;
319 /// assert_eq!(data, "\"a\",5,\"c\"\n3.14,\"y\",\"z\"\n");
320 /// Ok(())
321 /// }
322 /// ```
323 ///
324 /// # Example: never quote
325 ///
326 /// This shows how the CSV writer can be made to never write quotes, even
327 /// if it sacrifices the integrity of the data.
328 ///
329 /// ```
330 /// use std::error::Error;
331 /// use csv_async::{QuoteStyle, AsyncWriterBuilder};
332 ///
333 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
334 /// async fn example() -> Result<(), Box<dyn Error>> {
335 /// let mut wtr = AsyncWriterBuilder::new()
336 /// .quote_style(QuoteStyle::Never)
337 /// .create_writer(vec![]);
338 /// wtr.write_record(&["a", "foo\nbar", "c"]).await?;
339 /// wtr.write_record(&["g\"h\"i", "y", "z"]).await?;
340 ///
341 /// let data = String::from_utf8(wtr.into_inner().await?)?;
342 /// assert_eq!(data, "a,foo\nbar,c\ng\"h\"i,y,z\n");
343 /// Ok(())
344 /// }
345 /// ```
346 pub fn quote_style(&mut self, style: QuoteStyle) -> &mut AsyncWriterBuilder {
347 self.builder.quote_style(style.to_core());
348 self
349 }
350
351 /// The quote character to use when writing CSV.
352 ///
353 /// The default is `b'"'`.
354 ///
355 /// # Example
356 ///
357 /// ```
358 /// use std::error::Error;
359 /// use csv_async::AsyncWriterBuilder;
360 ///
361 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
362 /// async fn example() -> Result<(), Box<dyn Error>> {
363 /// let mut wtr = AsyncWriterBuilder::new()
364 /// .quote(b'\'')
365 /// .create_writer(vec![]);
366 /// wtr.write_record(&["a", "foo\nbar", "c"]).await?;
367 /// wtr.write_record(&["g'h'i", "y\"y\"y", "z"]).await?;
368 ///
369 /// let data = String::from_utf8(wtr.into_inner().await?)?;
370 /// assert_eq!(data, "a,'foo\nbar',c\n'g''h''i',y\"y\"y,z\n");
371 /// Ok(())
372 /// }
373 /// ```
374 pub fn quote(&mut self, quote: u8) -> &mut AsyncWriterBuilder {
375 self.builder.quote(quote);
376 self
377 }
378
379 /// Enable double quote escapes.
380 ///
381 /// This is enabled by default, but it may be disabled. When disabled,
382 /// quotes in field data are escaped instead of doubled.
383 ///
384 /// # Example
385 ///
386 /// ```
387 /// use std::error::Error;
388 /// use csv_async::AsyncWriterBuilder;
389 ///
390 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
391 /// async fn example() -> Result<(), Box<dyn Error>> {
392 /// let mut wtr = AsyncWriterBuilder::new()
393 /// .double_quote(false)
394 /// .create_writer(vec![]);
395 /// wtr.write_record(&["a", "foo\"bar", "c"]).await?;
396 /// wtr.write_record(&["x", "y", "z"]).await?;
397 ///
398 /// let data = String::from_utf8(wtr.into_inner().await?)?;
399 /// assert_eq!(data, "a,\"foo\\\"bar\",c\nx,y,z\n");
400 /// Ok(())
401 /// }
402 /// ```
403 pub fn double_quote(&mut self, yes: bool) -> &mut AsyncWriterBuilder {
404 self.builder.double_quote(yes);
405 self
406 }
407
408 /// The escape character to use when writing CSV.
409 ///
410 /// In some variants of CSV, quotes are escaped using a special escape
411 /// character like `\` (instead of escaping quotes by doubling them).
412 ///
413 /// By default, writing these idiosyncratic escapes is disabled, and is
414 /// only used when `double_quote` is disabled.
415 ///
416 /// # Example
417 ///
418 /// ```
419 /// use std::error::Error;
420 /// use csv_async::AsyncWriterBuilder;
421 ///
422 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
423 /// async fn example() -> Result<(), Box<dyn Error>> {
424 /// let mut wtr = AsyncWriterBuilder::new()
425 /// .double_quote(false)
426 /// .escape(b'$')
427 /// .create_writer(vec![]);
428 /// wtr.write_record(&["a", "foo\"bar", "c"]).await?;
429 /// wtr.write_record(&["x", "y", "z"]).await?;
430 ///
431 /// let data = String::from_utf8(wtr.into_inner().await?)?;
432 /// assert_eq!(data, "a,\"foo$\"bar\",c\nx,y,z\n");
433 /// Ok(())
434 /// }
435 /// ```
436 pub fn escape(&mut self, escape: u8) -> &mut AsyncWriterBuilder {
437 self.builder.escape(escape);
438 self
439 }
440
441 /// Use this when you are going to set comment for reader used to read saved file.
442 ///
443 /// If `quote_style` is set to `QuoteStyle::Necessary`, a field will
444 /// be quoted if the comment character is detected anywhere in the field.
445 ///
446 /// The default value is None.
447 ///
448 /// # Example
449 ///
450 /// ```
451 /// use std::error::Error;
452 /// use csv_async::AsyncWriterBuilder;
453 ///
454 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
455 /// async fn example() -> Result<(), Box<dyn Error>> {
456 /// let mut wtr =
457 /// AsyncWriterBuilder::new().comment(Some(b'#')).create_writer(Vec::new());
458 /// wtr.write_record(&["# comment", "another"]).await?;
459 /// let buf = wtr.into_inner().await?;
460 /// assert_eq!(String::from_utf8(buf).unwrap(), "\"# comment\",another\n");
461 /// Ok(())
462 /// }
463 /// ```
464 pub fn comment(&mut self, comment: Option<u8>) -> &mut AsyncWriterBuilder {
465 self.builder.comment(comment);
466 self
467 }
468
469 /// Set the capacity (in bytes) of the internal buffer used in the CSV
470 /// writer. This defaults to a reasonable setting.
471 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut AsyncWriterBuilder {
472 self.capacity = capacity;
473 self
474 }
475}
476
477//-//////////////////////////////////////////////////////////////////////////////////////////////
478//-// Writer
479//-//////////////////////////////////////////////////////////////////////////////////////////////
480
481#[derive(Debug)]
482struct WriterState {
483 /// Whether inconsistent record lengths are allowed.
484 flexible: bool,
485 /// The number of fields writtein in the first record. This is compared
486 /// with `fields_written` on all subsequent records to check for
487 /// inconsistent record lengths.
488 first_field_count: Option<u64>,
489 /// The number of fields written in this record. This is used to report
490 /// errors for inconsistent record lengths if `flexible` is disabled.
491 fields_written: u64,
492 /// This is set immediately before flushing the buffer and then unset
493 /// immediately after flushing the buffer. This avoids flushing the buffer
494 /// twice if the inner writer panics.
495 panicked: bool,
496}
497
498/// A simple internal buffer for buffering writes.
499///
500/// We need this because the `csv_core` APIs want to write into a `&mut [u8]`,
501/// which is not available with the `std::io::BufWriter` API.
502#[derive(Debug)]
503struct Buffer {
504 /// The contents of the buffer.
505 buf: Vec<u8>,
506 /// The number of bytes written to the buffer.
507 len: usize,
508}
509
510impl Buffer {
511 /// Returns a slice of the buffer's current contents.
512 ///
513 /// The slice returned may be empty.
514 #[inline]
515 fn readable(&self) -> &[u8] {
516 &self.buf[..self.len]
517 }
518
519 /// Returns a mutable slice of the remaining space in this buffer.
520 ///
521 /// The slice returned may be empty.
522 #[inline]
523 fn writable(&mut self) -> &mut [u8] {
524 &mut self.buf[self.len..]
525 }
526
527 /// Indicates that `n` bytes have been written to this buffer.
528 #[inline]
529 fn written(&mut self, n: usize) {
530 self.len += n;
531 }
532
533 /// Clear the buffer.
534 #[inline]
535 fn clear(&mut self) {
536 self.len = 0;
537 }
538}
539
540/// CSV async writer internal implementation used by both record writer and serializer.
541///
542#[derive(Debug)]
543pub struct AsyncWriterImpl<W: AsyncWrite + Unpin> {
544 core: CoreWriter,
545 wtr: Option<W>,
546 buf: Buffer,
547 state: WriterState,
548}
549
550impl<W: AsyncWrite + Unpin> Drop for AsyncWriterImpl<W> {
551 fn drop(&mut self) {
552 if self.wtr.is_some() && !self.state.panicked {
553 // We ignore result of flush() call while dropping
554 // Well known problem.
555 // If you care about flush result call it explicitly
556 // before AsyncWriter goes out of scope,
557 // second flush() call should be no op.
558 let _ = futures::executor::block_on(self.flush());
559 }
560 }
561}
562
563impl<W: AsyncWrite + Unpin> AsyncWriterImpl<W> {
564 fn new(builder: &AsyncWriterBuilder, wtr: W) -> AsyncWriterImpl<W> {
565 AsyncWriterImpl {
566 core: builder.builder.build(),
567 wtr: Some(wtr),
568 buf: Buffer { buf: vec![0; builder.capacity], len: 0 },
569 state: WriterState {
570 flexible: builder.flexible,
571 first_field_count: None,
572 fields_written: 0,
573 panicked: false,
574 },
575 }
576 }
577
578 /// Write a single record.
579 ///
580 pub async fn write_record<I, T>(&mut self, record: I) -> Result<()>
581 where
582 I: IntoIterator<Item = T>,
583 T: AsRef<[u8]>,
584 {
585 for field in record.into_iter() {
586 self.write_field_impl(field).await?;
587 }
588 self.write_terminator().await
589 }
590
591 /// Write a single `ByteRecord`.
592 ///
593 #[inline(never)]
594 pub async fn write_byte_record(&mut self, record: &ByteRecord) -> Result<()> {
595 if record.as_slice().is_empty() {
596 return self.write_record(record).await;
597 }
598 // The idea here is to find a fast path for shuffling our record into
599 // our buffer as quickly as possible. We do this because the underlying
600 // "core" CSV writer does a lot of book-keeping to maintain its state
601 // oriented API.
602 //
603 // The fast path occurs when we know our record will fit in whatever
604 // space we have left in our buffer. We can actually quickly compute
605 // the upper bound on the space required:
606 let upper_bound =
607 // The data itself plus the worst case: every byte is a quote.
608 (2 * record.as_slice().len())
609 // The number of field delimiters.
610 + (record.len().saturating_sub(1))
611 // The maximum number of quotes inserted around each field.
612 + (2 * record.len())
613 // The maximum number of bytes for the terminator.
614 + 2;
615 if self.buf.writable().len() < upper_bound {
616 return self.write_record(record).await;
617 }
618 let mut first = true;
619 for field in record.iter() {
620 if !first {
621 self.buf.writable()[0] = self.core.get_delimiter();
622 self.buf.written(1);
623 }
624 first = false;
625
626 if !self.core.should_quote(field) {
627 self.buf.writable()[..field.len()].copy_from_slice(field);
628 self.buf.written(field.len());
629 } else {
630 self.buf.writable()[0] = self.core.get_quote();
631 self.buf.written(1);
632 let (res, nin, nout) = csv_core::quote(
633 field,
634 self.buf.writable(),
635 self.core.get_quote(),
636 self.core.get_escape(),
637 self.core.get_double_quote(),
638 );
639 debug_assert!(res == WriteResult::InputEmpty);
640 debug_assert!(nin == field.len());
641 self.buf.written(nout);
642 self.buf.writable()[0] = self.core.get_quote();
643 self.buf.written(1);
644 }
645 }
646 self.state.fields_written = record.len() as u64;
647 self.write_terminator_into_buffer()
648 }
649
650 /// Write a single field.
651 ///
652 pub async fn write_field<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
653 self.write_field_impl(field).await
654 }
655
656 /// Implementation of write_field.
657 ///
658 /// This is a separate method so we can force the compiler to inline it
659 /// into write_record.
660 #[inline(always)]
661 async fn write_field_impl<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
662 if self.state.fields_written > 0 {
663 self.write_delimiter().await?;
664 }
665 let mut field = field.as_ref();
666 loop {
667 let (res, nin, nout) = self.core.field(field, self.buf.writable());
668 field = &field[nin..];
669 self.buf.written(nout);
670 match res {
671 WriteResult::InputEmpty => {
672 self.state.fields_written += 1;
673 return Ok(());
674 }
675 WriteResult::OutputFull => self.flush_buf().await?,
676 }
677 }
678 }
679
680 /// Flush the contents of the internal buffer to the underlying writer.
681 ///
682 /// If there was a problem writing to the underlying writer, then an error
683 /// is returned.
684 ///
685 /// Note that this also flushes the underlying writer.
686 pub async fn flush(&mut self) -> io::Result<()> {
687 self.flush_buf().await?;
688 self.wtr.as_mut().unwrap().flush().await?;
689 Ok(())
690 }
691
692 /// Flush the contents of the internal buffer to the underlying writer,
693 /// without flushing the underlying writer.
694 async fn flush_buf(&mut self) -> io::Result<()> {
695 self.state.panicked = true;
696 let result = self.wtr.as_mut().unwrap().write_all(self.buf.readable()).await;
697 self.state.panicked = false;
698 result?;
699 self.buf.clear();
700 Ok(())
701 }
702
703 /// Flush the contents of the internal buffer and return the underlying
704 /// writer.
705 pub async fn into_inner(
706 mut self,
707 ) -> result::Result<W, IntoInnerError<AsyncWriterImpl<W>>> {
708 match self.flush().await {
709 Ok(()) => Ok(self.wtr.take().unwrap()),
710 Err(err) => Err(IntoInnerError::new(self, err)),
711 }
712 }
713
714 /// Write a CSV delimiter.
715 async fn write_delimiter(&mut self) -> Result<()> {
716 loop {
717 let (res, nout) = self.core.delimiter(self.buf.writable());
718 self.buf.written(nout);
719 match res {
720 WriteResult::InputEmpty => return Ok(()),
721 WriteResult::OutputFull => self.flush_buf().await?,
722 }
723 }
724 }
725
726 /// Write a CSV terminator.
727 async fn write_terminator(&mut self) -> Result<()> {
728 self.check_field_count()?;
729 loop {
730 let (res, nout) = self.core.terminator(self.buf.writable());
731 self.buf.written(nout);
732 match res {
733 WriteResult::InputEmpty => {
734 self.state.fields_written = 0;
735 return Ok(());
736 }
737 WriteResult::OutputFull => self.flush_buf().await?,
738 }
739 }
740 }
741
742 /// Write a CSV terminator that is guaranteed to fit into the current buffer.
743 ///
744 #[inline(never)]
745 fn write_terminator_into_buffer(&mut self) -> Result<()> {
746 self.check_field_count()?;
747 match self.core.get_terminator() {
748 csv_core::Terminator::CRLF => {
749 self.buf.writable()[0] = b'\r';
750 self.buf.writable()[1] = b'\n';
751 self.buf.written(2);
752 }
753 csv_core::Terminator::Any(b) => {
754 self.buf.writable()[0] = b;
755 self.buf.written(1);
756 }
757 _ => unreachable!(),
758 }
759 self.state.fields_written = 0;
760 Ok(())
761 }
762
763 fn check_field_count(&mut self) -> Result<()> {
764 if !self.state.flexible {
765 match self.state.first_field_count {
766 None => {
767 self.state.first_field_count =
768 Some(self.state.fields_written);
769 }
770 Some(expected) if expected != self.state.fields_written => {
771 return Err(Error::new(ErrorKind::UnequalLengths {
772 pos: None,
773 expected_len: expected,
774 len: self.state.fields_written,
775 }))
776 }
777 Some(_) => {}
778 }
779 }
780 Ok(())
781 }
782}