csv_async/async_writers/
mod.rs

1use std::result;
2
3use csv_core::WriterBuilder as CoreWriterBuilder;
4use csv_core::{self, WriteResult, Writer as CoreWriter};
5cfg_if::cfg_if! {
6if #[cfg(feature = "tokio")] {
7    use tokio::io::{self, AsyncWrite, AsyncWriteExt};
8} else {
9    use futures::io::{self, AsyncWrite, AsyncWriteExt};
10}}
11    
12
13use crate::{QuoteStyle, Terminator};
14use crate::byte_record::ByteRecord;
15use crate::error::{Error, ErrorKind, IntoInnerError, Result};
16
17#[cfg(feature = "with_serde")]
18pub mod mwtr_serde;
19
20cfg_if::cfg_if! {
21if #[cfg(feature = "tokio")] {
22    pub mod awtr_tokio;
23} else {
24    pub mod awtr_futures;
25}}
26        
27#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
28pub mod aser_futures;
29    
30#[cfg(all(feature = "with_serde", feature = "tokio"))]
31pub mod aser_tokio;
32
33//-//////////////////////////////////////////////////////////////////////////////////////////////
34//-// Builder
35//-//////////////////////////////////////////////////////////////////////////////////////////////
36
37/// Builds a CSV writer with various configuration knobs.
38///
39/// This builder can be used to tweak the field delimiter, record terminator
40/// and more. Once a CSV `AsyncWriter` is built, its configuration cannot be
41/// changed.
42#[derive(Debug)]
43pub struct AsyncWriterBuilder {
44    builder: CoreWriterBuilder,
45    capacity: usize,
46    flexible: bool,
47    has_headers: bool,
48}
49
50impl Default for AsyncWriterBuilder {
51    fn default() -> AsyncWriterBuilder {
52        AsyncWriterBuilder {
53            builder: CoreWriterBuilder::default(),
54            capacity: 8 * (1 << 10),
55            flexible: false,
56            has_headers: true,
57        }
58    }
59}
60
61impl AsyncWriterBuilder {
62    /// Create a new builder for configuring CSV writing.
63    ///
64    /// To convert a builder into a writer, call one of the methods starting
65    /// with `from_`.
66    ///
67    /// # Example
68    ///
69    /// ```
70    /// use std::error::Error;
71    /// use csv_async::AsyncWriterBuilder;
72    ///
73    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
74    /// async fn example() -> Result<(), Box<dyn Error>> {
75    ///     let mut wtr = AsyncWriterBuilder::new().create_writer(vec![]);
76    ///     wtr.write_record(&["a", "b", "c"]).await?;
77    ///     wtr.write_record(&["x", "y", "z"]).await?;
78    ///
79    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
80    ///     assert_eq!(data, "a,b,c\nx,y,z\n");
81    ///     Ok(())
82    /// }
83    /// ```
84    pub fn new() -> AsyncWriterBuilder {
85        AsyncWriterBuilder::default()
86    }
87
88    /// The field delimiter to use when writing CSV.
89    ///
90    /// The default is `b','`.
91    ///
92    /// # Example
93    ///
94    /// ```
95    /// use std::error::Error;
96    /// use csv_async::AsyncWriterBuilder;
97    ///
98    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
99    /// async fn example() -> Result<(), Box<dyn Error>> {
100    ///     let mut wtr = AsyncWriterBuilder::new()
101    ///         .delimiter(b';')
102    ///         .create_writer(vec![]);
103    ///     wtr.write_record(&["a", "b", "c"]).await?;
104    ///     wtr.write_record(&["x", "y", "z"]).await?;
105    ///
106    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
107    ///     assert_eq!(data, "a;b;c\nx;y;z\n");
108    ///     Ok(())
109    /// }
110    /// ```
111    pub fn delimiter(&mut self, delimiter: u8) -> &mut AsyncWriterBuilder {
112        self.builder.delimiter(delimiter);
113        self
114    }
115    /// Whether to write a header row before writing any other row.
116    ///
117    /// When this is enabled and the `serialize` method is used to write data
118    /// with something that contains field names (i.e., a struct), then a
119    /// header row is written containing the field names before any other row
120    /// is written.
121    ///
122    /// This option has no effect when using other methods to write rows. That
123    /// is, if you don't use `serialize`, then you must write your header row
124    /// explicitly if you want a header row.
125    ///
126    /// This is enabled by default.
127    ///
128    // / # Example: with headers
129    // /
130    // / This shows how the header will be automatically written from the field
131    // / names of a struct.
132    // /
133    // / ```
134    // / use std::error::Error;
135    // /
136    // / use csv::WriterBuilder;
137    // / use serde::Serialize;
138    // /
139    // / #[derive(Serialize)]
140    // / struct Row<'a> {
141    // /     city: &'a str,
142    // /     country: &'a str,
143    // /     // Serde allows us to name our headers exactly,
144    // /     // even if they don't match our struct field names.
145    // /     #[serde(rename = "popcount")]
146    // /     population: u64,
147    // / }
148    // /
149    // / # fn main() { example().unwrap(); }
150    // / fn example() -> Result<(), Box<dyn Error>> {
151    // /     let mut wtr = WriterBuilder::new().from_writer(vec![]);
152    // /     wtr.serialize(Row {
153    // /         city: "Boston",
154    // /         country: "United States",
155    // /         population: 4628910,
156    // /     })?;
157    // /     wtr.serialize(Row {
158    // /         city: "Concord",
159    // /         country: "United States",
160    // /         population: 42695,
161    // /     })?;
162    // /
163    // /     let data = String::from_utf8(wtr.into_inner()?)?;
164    // /     assert_eq!(data, "\
165    // / city,country,popcount
166    // / Boston,United States,4628910
167    // / Concord,United States,42695
168    // / ");
169    // /     Ok(())
170    // / }
171    // / ```
172    // /
173    // / # Example: without headers
174    // /
175    // / This shows that serializing things that aren't structs (in this case,
176    // / a tuple struct) won't result in a header row being written. This means
177    // / you usually don't need to set `has_headers(false)` unless you
178    // / explicitly want to both write custom headers and serialize structs.
179    // /
180    // / ```
181    // / use std::error::Error;
182    // / use csv::WriterBuilder;
183    // /
184    // / # fn main() { example().unwrap(); }
185    // / fn example() -> Result<(), Box<dyn Error>> {
186    // /     let mut wtr = WriterBuilder::new().from_writer(vec![]);
187    // /     wtr.serialize(("Boston", "United States", 4628910))?;
188    // /     wtr.serialize(("Concord", "United States", 42695))?;
189    // /
190    // /     let data = String::from_utf8(wtr.into_inner()?)?;
191    // /     assert_eq!(data, "\
192    // / Boston,United States,4628910
193    // / Concord,United States,42695
194    // / ");
195    // /     Ok(())
196    // / }
197    // / ```
198    pub fn has_headers(&mut self, yes: bool) -> &mut AsyncWriterBuilder {
199        self.has_headers = yes;
200        self
201    }
202
203    /// Whether the number of fields in records is allowed to change or not.
204    ///
205    /// When disabled (which is the default), writing CSV data will return an
206    /// error if a record is written with a number of fields different from the
207    /// number of fields written in a previous record.
208    ///
209    /// When enabled, this error checking is turned off.
210    ///
211    /// # Example: writing flexible records
212    ///
213    /// ```
214    /// use std::error::Error;
215    /// use csv_async::AsyncWriterBuilder;
216    ///
217    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
218    /// async fn example() -> Result<(), Box<dyn Error>> {
219    ///     let mut wtr = AsyncWriterBuilder::new()
220    ///         .flexible(true)
221    ///         .create_writer(vec![]);
222    ///     wtr.write_record(&["a", "b"]).await?;
223    ///     wtr.write_record(&["x", "y", "z"]).await?;
224    ///
225    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
226    ///     assert_eq!(data, "a,b\nx,y,z\n");
227    ///     Ok(())
228    /// }
229    /// ```
230    ///
231    /// # Example: error when `flexible` is disabled
232    ///
233    /// ```
234    /// use std::error::Error;
235    /// use csv_async::AsyncWriterBuilder;
236    ///
237    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
238    /// async fn example() -> Result<(), Box<dyn Error>> {
239    ///     let mut wtr = AsyncWriterBuilder::new()
240    ///         .flexible(false)
241    ///         .create_writer(vec![]);
242    ///     wtr.write_record(&["a", "b"]).await?;
243    ///     let err = wtr.write_record(&["x", "y", "z"]).await.unwrap_err();
244    ///     match *err.kind() {
245    ///         csv_async::ErrorKind::UnequalLengths { expected_len, len, .. } => {
246    ///             assert_eq!(expected_len, 2);
247    ///             assert_eq!(len, 3);
248    ///         }
249    ///         ref wrong => {
250    ///             panic!("expected UnequalLengths but got {:?}", wrong);
251    ///         }
252    ///     }
253    ///     Ok(())
254    /// }
255    /// ```
256    pub fn flexible(&mut self, yes: bool) -> &mut AsyncWriterBuilder {
257        self.flexible = yes;
258        self
259    }
260
261    /// The record terminator to use when writing CSV.
262    ///
263    /// A record terminator can be any single byte. The default is `\n`.
264    ///
265    /// Note that RFC 4180 specifies that record terminators should be `\r\n`.
266    /// To use `\r\n`, use the special `Terminator::CRLF` value.
267    ///
268    /// # Example: CRLF
269    ///
270    /// This shows how to use RFC 4180 compliant record terminators.
271    ///
272    /// ```
273    /// use std::error::Error;
274    /// use csv_async::{Terminator, AsyncWriterBuilder};
275    ///
276    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
277    /// async fn example() -> Result<(), Box<dyn Error>> {
278    ///     let mut wtr = AsyncWriterBuilder::new()
279    ///         .terminator(Terminator::CRLF)
280    ///         .create_writer(vec![]);
281    ///     wtr.write_record(&["a", "b", "c"]).await?;
282    ///     wtr.write_record(&["x", "y", "z"]).await?;
283    ///
284    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
285    ///     assert_eq!(data, "a,b,c\r\nx,y,z\r\n");
286    ///     Ok(())
287    /// }
288    /// ```
289    pub fn terminator(&mut self, term: Terminator) -> &mut AsyncWriterBuilder {
290        self.builder.terminator(term.to_core());
291        self
292    }
293
294    /// The quoting style to use when writing CSV.
295    ///
296    /// By default, this is set to `QuoteStyle::Necessary`, which will only
297    /// use quotes when they are necessary to preserve the integrity of data.
298    ///
299    /// Note that unless the quote style is set to `Never`, an empty field is
300    /// quoted if it is the only field in a record.
301    ///
302    /// # Example: non-numeric quoting
303    ///
304    /// This shows how to quote non-numeric fields only.
305    ///
306    /// ```
307    /// use std::error::Error;
308    /// use csv_async::{QuoteStyle, AsyncWriterBuilder};
309    ///
310    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
311    /// async fn example() -> Result<(), Box<dyn Error>> {
312    ///     let mut wtr = AsyncWriterBuilder::new()
313    ///         .quote_style(QuoteStyle::NonNumeric)
314    ///         .create_writer(vec![]);
315    ///     wtr.write_record(&["a", "5", "c"]).await?;
316    ///     wtr.write_record(&["3.14", "y", "z"]).await?;
317    ///
318    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
319    ///     assert_eq!(data, "\"a\",5,\"c\"\n3.14,\"y\",\"z\"\n");
320    ///     Ok(())
321    /// }
322    /// ```
323    ///
324    /// # Example: never quote
325    ///
326    /// This shows how the CSV writer can be made to never write quotes, even
327    /// if it sacrifices the integrity of the data.
328    ///
329    /// ```
330    /// use std::error::Error;
331    /// use csv_async::{QuoteStyle, AsyncWriterBuilder};
332    ///
333    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
334    /// async fn example() -> Result<(), Box<dyn Error>> {
335    ///     let mut wtr = AsyncWriterBuilder::new()
336    ///         .quote_style(QuoteStyle::Never)
337    ///         .create_writer(vec![]);
338    ///     wtr.write_record(&["a", "foo\nbar", "c"]).await?;
339    ///     wtr.write_record(&["g\"h\"i", "y", "z"]).await?;
340    ///
341    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
342    ///     assert_eq!(data, "a,foo\nbar,c\ng\"h\"i,y,z\n");
343    ///     Ok(())
344    /// }
345    /// ```
346    pub fn quote_style(&mut self, style: QuoteStyle) -> &mut AsyncWriterBuilder {
347        self.builder.quote_style(style.to_core());
348        self
349    }
350
351    /// The quote character to use when writing CSV.
352    ///
353    /// The default is `b'"'`.
354    ///
355    /// # Example
356    ///
357    /// ```
358    /// use std::error::Error;
359    /// use csv_async::AsyncWriterBuilder;
360    ///
361    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
362    /// async fn example() -> Result<(), Box<dyn Error>> {
363    ///     let mut wtr = AsyncWriterBuilder::new()
364    ///         .quote(b'\'')
365    ///         .create_writer(vec![]);
366    ///     wtr.write_record(&["a", "foo\nbar", "c"]).await?;
367    ///     wtr.write_record(&["g'h'i", "y\"y\"y", "z"]).await?;
368    ///
369    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
370    ///     assert_eq!(data, "a,'foo\nbar',c\n'g''h''i',y\"y\"y,z\n");
371    ///     Ok(())
372    /// }
373    /// ```
374    pub fn quote(&mut self, quote: u8) -> &mut AsyncWriterBuilder {
375        self.builder.quote(quote);
376        self
377    }
378
379    /// Enable double quote escapes.
380    ///
381    /// This is enabled by default, but it may be disabled. When disabled,
382    /// quotes in field data are escaped instead of doubled.
383    ///
384    /// # Example
385    ///
386    /// ```
387    /// use std::error::Error;
388    /// use csv_async::AsyncWriterBuilder;
389    ///
390    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
391    /// async fn example() -> Result<(), Box<dyn Error>> {
392    ///     let mut wtr = AsyncWriterBuilder::new()
393    ///         .double_quote(false)
394    ///         .create_writer(vec![]);
395    ///     wtr.write_record(&["a", "foo\"bar", "c"]).await?;
396    ///     wtr.write_record(&["x", "y", "z"]).await?;
397    ///
398    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
399    ///     assert_eq!(data, "a,\"foo\\\"bar\",c\nx,y,z\n");
400    ///     Ok(())
401    /// }
402    /// ```
403    pub fn double_quote(&mut self, yes: bool) -> &mut AsyncWriterBuilder {
404        self.builder.double_quote(yes);
405        self
406    }
407
408    /// The escape character to use when writing CSV.
409    ///
410    /// In some variants of CSV, quotes are escaped using a special escape
411    /// character like `\` (instead of escaping quotes by doubling them).
412    ///
413    /// By default, writing these idiosyncratic escapes is disabled, and is
414    /// only used when `double_quote` is disabled.
415    ///
416    /// # Example
417    ///
418    /// ```
419    /// use std::error::Error;
420    /// use csv_async::AsyncWriterBuilder;
421    ///
422    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
423    /// async fn example() -> Result<(), Box<dyn Error>> {
424    ///     let mut wtr = AsyncWriterBuilder::new()
425    ///         .double_quote(false)
426    ///         .escape(b'$')
427    ///         .create_writer(vec![]);
428    ///     wtr.write_record(&["a", "foo\"bar", "c"]).await?;
429    ///     wtr.write_record(&["x", "y", "z"]).await?;
430    ///
431    ///     let data = String::from_utf8(wtr.into_inner().await?)?;
432    ///     assert_eq!(data, "a,\"foo$\"bar\",c\nx,y,z\n");
433    ///     Ok(())
434    /// }
435    /// ```
436    pub fn escape(&mut self, escape: u8) -> &mut AsyncWriterBuilder {
437        self.builder.escape(escape);
438        self
439    }
440
441    /// Use this when you are going to set comment for reader used to read saved file.
442    ///
443    /// If `quote_style` is set to `QuoteStyle::Necessary`, a field will
444    /// be quoted if the comment character is detected anywhere in the field.
445    ///
446    /// The default value is None.
447    ///
448    /// # Example
449    ///
450    /// ```
451    /// use std::error::Error;
452    /// use csv_async::AsyncWriterBuilder;
453    ///
454    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
455    /// async fn example() -> Result<(), Box<dyn Error>> {
456    ///     let mut wtr =
457    ///         AsyncWriterBuilder::new().comment(Some(b'#')).create_writer(Vec::new());
458    ///     wtr.write_record(&["# comment", "another"]).await?;
459    ///     let buf = wtr.into_inner().await?;
460    ///     assert_eq!(String::from_utf8(buf).unwrap(), "\"# comment\",another\n");
461    ///     Ok(())
462    /// }
463    /// ```
464    pub fn comment(&mut self, comment: Option<u8>) -> &mut AsyncWriterBuilder {
465        self.builder.comment(comment);
466        self
467    }
468
469    /// Set the capacity (in bytes) of the internal buffer used in the CSV
470    /// writer. This defaults to a reasonable setting.
471    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut AsyncWriterBuilder {
472        self.capacity = capacity;
473        self
474    }
475}
476
477//-//////////////////////////////////////////////////////////////////////////////////////////////
478//-// Writer
479//-//////////////////////////////////////////////////////////////////////////////////////////////
480
481#[derive(Debug)]
482struct WriterState {
483    /// Whether inconsistent record lengths are allowed.
484    flexible: bool,
485    /// The number of fields writtein in the first record. This is compared
486    /// with `fields_written` on all subsequent records to check for
487    /// inconsistent record lengths.
488    first_field_count: Option<u64>,
489    /// The number of fields written in this record. This is used to report
490    /// errors for inconsistent record lengths if `flexible` is disabled.
491    fields_written: u64,
492    /// This is set immediately before flushing the buffer and then unset
493    /// immediately after flushing the buffer. This avoids flushing the buffer
494    /// twice if the inner writer panics.
495    panicked: bool,
496}
497
498/// A simple internal buffer for buffering writes.
499///
500/// We need this because the `csv_core` APIs want to write into a `&mut [u8]`,
501/// which is not available with the `std::io::BufWriter` API.
502#[derive(Debug)]
503struct Buffer {
504    /// The contents of the buffer.
505    buf: Vec<u8>,
506    /// The number of bytes written to the buffer.
507    len: usize,
508}
509
510impl Buffer {
511    /// Returns a slice of the buffer's current contents.
512    ///
513    /// The slice returned may be empty.
514    #[inline]
515    fn readable(&self) -> &[u8] {
516        &self.buf[..self.len]
517    }
518
519    /// Returns a mutable slice of the remaining space in this buffer.
520    ///
521    /// The slice returned may be empty.
522    #[inline]
523    fn writable(&mut self) -> &mut [u8] {
524        &mut self.buf[self.len..]
525    }
526
527    /// Indicates that `n` bytes have been written to this buffer.
528    #[inline]
529    fn written(&mut self, n: usize) {
530        self.len += n;
531    }
532
533    /// Clear the buffer.
534    #[inline]
535    fn clear(&mut self) {
536        self.len = 0;
537    }
538}
539
540/// CSV async writer internal implementation used by both record writer and serializer.
541/// 
542#[derive(Debug)]
543pub struct AsyncWriterImpl<W: AsyncWrite + Unpin> {
544    core: CoreWriter,
545    wtr: Option<W>,
546    buf: Buffer,
547    state: WriterState,
548}
549
550impl<W: AsyncWrite + Unpin> Drop for AsyncWriterImpl<W> {
551    fn drop(&mut self) {
552        if self.wtr.is_some() && !self.state.panicked {
553            // We ignore result of flush() call while dropping
554            // Well known problem.
555            // If you care about flush result call it explicitly 
556            // before AsyncWriter goes out of scope,
557            // second flush() call should be no op.
558            let _ = futures::executor::block_on(self.flush());
559        }
560    }
561}
562
563impl<W: AsyncWrite + Unpin> AsyncWriterImpl<W> {
564    fn new(builder: &AsyncWriterBuilder, wtr: W) -> AsyncWriterImpl<W> {
565        AsyncWriterImpl {
566            core: builder.builder.build(),
567            wtr: Some(wtr),
568            buf: Buffer { buf: vec![0; builder.capacity], len: 0 },
569            state: WriterState {
570                flexible: builder.flexible,
571                first_field_count: None,
572                fields_written: 0,
573                panicked: false,
574            },
575        }
576    }
577
578    /// Write a single record.
579    ///
580    pub async fn write_record<I, T>(&mut self, record: I) -> Result<()>
581    where
582        I: IntoIterator<Item = T>,
583        T: AsRef<[u8]>,
584    {
585        for field in record.into_iter() {
586            self.write_field_impl(field).await?;
587        }
588        self.write_terminator().await
589    }
590
591    /// Write a single `ByteRecord`.
592    ///
593    #[inline(never)]
594    pub async fn write_byte_record(&mut self, record: &ByteRecord) -> Result<()> {
595        if record.as_slice().is_empty() {
596            return self.write_record(record).await;
597        }
598        // The idea here is to find a fast path for shuffling our record into
599        // our buffer as quickly as possible. We do this because the underlying
600        // "core" CSV writer does a lot of book-keeping to maintain its state
601        // oriented API.
602        //
603        // The fast path occurs when we know our record will fit in whatever
604        // space we have left in our buffer. We can actually quickly compute
605        // the upper bound on the space required:
606        let upper_bound =
607            // The data itself plus the worst case: every byte is a quote.
608            (2 * record.as_slice().len())
609            // The number of field delimiters.
610            + (record.len().saturating_sub(1))
611            // The maximum number of quotes inserted around each field.
612            + (2 * record.len())
613            // The maximum number of bytes for the terminator.
614            + 2;
615        if self.buf.writable().len() < upper_bound {
616            return self.write_record(record).await;
617        }
618        let mut first = true;
619        for field in record.iter() {
620            if !first {
621                self.buf.writable()[0] = self.core.get_delimiter();
622                self.buf.written(1);
623            }
624            first = false;
625
626            if !self.core.should_quote(field) {
627                self.buf.writable()[..field.len()].copy_from_slice(field);
628                self.buf.written(field.len());
629            } else {
630                self.buf.writable()[0] = self.core.get_quote();
631                self.buf.written(1);
632                let (res, nin, nout) = csv_core::quote(
633                    field,
634                    self.buf.writable(),
635                    self.core.get_quote(),
636                    self.core.get_escape(),
637                    self.core.get_double_quote(),
638                );
639                debug_assert!(res == WriteResult::InputEmpty);
640                debug_assert!(nin == field.len());
641                self.buf.written(nout);
642                self.buf.writable()[0] = self.core.get_quote();
643                self.buf.written(1);
644            }
645        }
646        self.state.fields_written = record.len() as u64;
647        self.write_terminator_into_buffer()
648    }
649
650    /// Write a single field.
651    ///
652    pub async fn write_field<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
653        self.write_field_impl(field).await
654    }
655
656    /// Implementation of write_field.
657    ///
658    /// This is a separate method so we can force the compiler to inline it
659    /// into write_record.
660    #[inline(always)]
661    async fn write_field_impl<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
662        if self.state.fields_written > 0 {
663            self.write_delimiter().await?;
664        }
665        let mut field = field.as_ref();
666        loop {
667            let (res, nin, nout) = self.core.field(field, self.buf.writable());
668            field = &field[nin..];
669            self.buf.written(nout);
670            match res {
671                WriteResult::InputEmpty => {
672                    self.state.fields_written += 1;
673                    return Ok(());
674                }
675                WriteResult::OutputFull => self.flush_buf().await?,
676            }
677        }
678    }
679
680    /// Flush the contents of the internal buffer to the underlying writer.
681    ///
682    /// If there was a problem writing to the underlying writer, then an error
683    /// is returned.
684    ///
685    /// Note that this also flushes the underlying writer.
686    pub async fn flush(&mut self) -> io::Result<()> {
687        self.flush_buf().await?;
688        self.wtr.as_mut().unwrap().flush().await?;
689        Ok(())
690    }
691
692    /// Flush the contents of the internal buffer to the underlying writer,
693    /// without flushing the underlying writer.
694    async fn flush_buf(&mut self) -> io::Result<()> {
695        self.state.panicked = true;
696        let result = self.wtr.as_mut().unwrap().write_all(self.buf.readable()).await;
697        self.state.panicked = false;
698        result?;
699        self.buf.clear();
700        Ok(())
701    }
702
703    /// Flush the contents of the internal buffer and return the underlying
704    /// writer.
705    pub async fn into_inner(
706        mut self,
707    ) -> result::Result<W, IntoInnerError<AsyncWriterImpl<W>>> {
708        match self.flush().await {
709            Ok(()) => Ok(self.wtr.take().unwrap()),
710            Err(err) => Err(IntoInnerError::new(self, err)),
711        }
712    }
713
714    /// Write a CSV delimiter.
715    async fn write_delimiter(&mut self) -> Result<()> {
716        loop {
717            let (res, nout) = self.core.delimiter(self.buf.writable());
718            self.buf.written(nout);
719            match res {
720                WriteResult::InputEmpty => return Ok(()),
721                WriteResult::OutputFull => self.flush_buf().await?,
722            }
723        }
724    }
725
726    /// Write a CSV terminator.
727    async fn write_terminator(&mut self) -> Result<()> {
728        self.check_field_count()?;
729        loop {
730            let (res, nout) = self.core.terminator(self.buf.writable());
731            self.buf.written(nout);
732            match res {
733                WriteResult::InputEmpty => {
734                    self.state.fields_written = 0;
735                    return Ok(());
736                }
737                WriteResult::OutputFull => self.flush_buf().await?,
738            }
739        }
740    }
741
742    /// Write a CSV terminator that is guaranteed to fit into the current buffer.
743    /// 
744    #[inline(never)]
745    fn write_terminator_into_buffer(&mut self) -> Result<()> {
746        self.check_field_count()?;
747        match self.core.get_terminator() {
748            csv_core::Terminator::CRLF => {
749                self.buf.writable()[0] = b'\r';
750                self.buf.writable()[1] = b'\n';
751                self.buf.written(2);
752            }
753            csv_core::Terminator::Any(b) => {
754                self.buf.writable()[0] = b;
755                self.buf.written(1);
756            }
757            _ => unreachable!(),
758        }
759        self.state.fields_written = 0;
760        Ok(())
761    }
762
763    fn check_field_count(&mut self) -> Result<()> {
764        if !self.state.flexible {
765            match self.state.first_field_count {
766                None => {
767                    self.state.first_field_count =
768                        Some(self.state.fields_written);
769                }
770                Some(expected) if expected != self.state.fields_written => {
771                    return Err(Error::new(ErrorKind::UnequalLengths {
772                        pos: None,
773                        expected_len: expected,
774                        len: self.state.fields_written,
775                    }))
776                }
777                Some(_) => {}
778            }
779        }
780        Ok(())
781    }
782}