csv_async/async_readers/
mod.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::result;
4use std::task::{Context, Poll};
5
6cfg_if::cfg_if! {
7if #[cfg(feature = "tokio")] {
8    use tokio::io::{self, AsyncBufRead, AsyncSeekExt};
9    use tokio_stream::Stream;
10} else {
11    use futures::io::{self, AsyncBufRead, AsyncSeekExt};
12    use futures::stream::Stream;
13}}
14
15use csv_core::ReaderBuilder as CoreReaderBuilder;
16use csv_core::Reader as CoreReader;
17#[cfg(feature = "with_serde")]
18use serde::de::DeserializeOwned;
19
20use crate::{Terminator, Trim};
21use crate::byte_record::{ByteRecord, Position};
22use crate::error::{Error, ErrorKind, Result, Utf8Error};
23use crate::string_record::StringRecord;
24
25cfg_if::cfg_if! {
26if #[cfg(feature = "tokio")] {
27    pub mod ardr_tokio;
28} else {
29    pub mod ardr_futures;
30}}
31
32#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
33pub mod ades_futures;
34
35#[cfg(all(feature = "with_serde", feature = "tokio"))]
36pub mod ades_tokio;
37
38//-//////////////////////////////////////////////////////////////////////////////////////////////
39//-// Builder
40//-//////////////////////////////////////////////////////////////////////////////////////////////
41
42/// Builds a CSV reader with various configuration knobs.
43///
44/// This builder can be used to tweak the field delimiter, record terminator
45/// and more. Once a CSV reader / deserializer is built, its configuration cannot be
46/// changed.
47#[derive(Debug)]
48pub struct AsyncReaderBuilder {
49    capacity: usize,
50    flexible: bool,
51    has_headers: bool,
52    trim: Trim,
53    end_on_io_error: bool,
54    /// The underlying CSV parser builder.
55    ///
56    /// We explicitly put this on the heap because CoreReaderBuilder embeds an
57    /// entire DFA transition table, which along with other things, tallies up
58    /// to almost 500 bytes on the stack.
59    builder: Box<CoreReaderBuilder>,
60}
61
62impl Default for AsyncReaderBuilder {
63    fn default() -> AsyncReaderBuilder {
64        AsyncReaderBuilder {
65            capacity: 8 * (1 << 10),
66            flexible: false,
67            has_headers: true,
68            trim: Trim::default(),
69            end_on_io_error: true,
70            builder: Box::<CoreReaderBuilder>::default(),
71        }
72    }
73}
74
75impl AsyncReaderBuilder {
76    /// Create a new builder for configuring CSV parsing.
77    ///
78    /// To convert a builder into a reader, call one of the methods starting
79    /// with `from_`.
80    ///
81    /// # Example
82    ///
83    /// ```
84    /// use std::error::Error;
85    /// use futures::stream::StreamExt;
86    /// use csv_async::{AsyncReaderBuilder, StringRecord};
87    ///
88    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
89    /// async fn example() -> Result<(), Box<dyn Error>> {
90    ///     let data = "\
91    /// city,country,pop
92    /// Boston,United States,4628910
93    /// Concord,United States,42695
94    /// ";
95    ///     let mut rdr = AsyncReaderBuilder::new().create_reader(data.as_bytes());
96    ///
97    ///     let records = rdr
98    ///         .records()
99    ///         .map(Result::unwrap)
100    ///         .collect::<Vec<StringRecord>>().await;
101    ///     assert_eq!(records, vec![
102    ///         vec!["Boston", "United States", "4628910"],
103    ///         vec!["Concord", "United States", "42695"],
104    ///     ]);
105    ///     Ok(())
106    /// }
107    /// ```
108    pub fn new() -> AsyncReaderBuilder {
109        AsyncReaderBuilder::default()
110    }
111
112    /// The field delimiter to use when parsing CSV.
113    ///
114    /// The default is `b','`.
115    ///
116    /// # Example
117    ///
118    /// ```
119    /// use std::error::Error;
120    /// use futures::stream::StreamExt;
121    /// use csv_async::{AsyncReaderBuilder, StringRecord};
122    ///
123    /// # fn main() { async_std::task::block_on(async {example().await}); }
124    /// async fn example() {
125    ///     let data = "\
126    /// city;country;pop
127    /// Boston;United States;4628910
128    /// ";
129    ///     let mut rdr = AsyncReaderBuilder::new()
130    ///         .delimiter(b';')
131    ///         .create_reader(data.as_bytes());
132    ///
133    ///     let records = rdr
134    ///         .records()
135    ///         .map(Result::unwrap)
136    ///         .collect::<Vec<StringRecord>>().await;
137    ///     assert_eq!(records, vec![
138    ///         vec!["Boston", "United States", "4628910"],
139    ///     ]);
140     /// }
141    /// ```
142    pub fn delimiter(&mut self, delimiter: u8) -> &mut AsyncReaderBuilder {
143        self.builder.delimiter(delimiter);
144        self
145    }
146
147    /// Whether to treat the first row as a special header row.
148    ///
149    /// By default, the first row is treated as a special header row, which
150    /// means the header is never returned by any of the record reading methods
151    /// or iterators. When this is disabled (`yes` set to `false`), the first
152    /// row is not treated specially.
153    ///
154    /// Note that the `headers` and `byte_headers` methods are unaffected by
155    /// whether this is set. Those methods always return the first record.
156    ///
157    /// # Example
158    ///
159    /// This example shows what happens when `has_headers` is disabled.
160    /// Namely, the first row is treated just like any other row.
161    ///
162    /// ```
163    /// use std::error::Error;
164    /// use futures::stream::StreamExt;
165    /// use csv_async::AsyncReaderBuilder;
166    ///
167    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
168    /// async fn example() -> Result<(), Box<dyn Error>> {
169    ///     let data = "\
170    /// city,country,pop
171    /// Boston,United States,4628910
172    /// ";
173    ///     let mut rdr = AsyncReaderBuilder::new()
174    ///         .has_headers(false)
175    ///         .create_reader(data.as_bytes());
176    ///     let mut iter = rdr.records();
177    ///
178    ///     // Read the first record.
179    ///     assert_eq!(iter.next().await.unwrap()?, vec!["city", "country", "pop"]);
180    ///
181    ///     // Read the second record.
182    ///     assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
183    ///
184    ///     assert!(iter.next().await.is_none());
185    ///     Ok(())
186    /// }
187    /// ```
188    pub fn has_headers(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
189        self.has_headers = yes;
190        self
191    }
192
193    /// Whether the number of fields in records is allowed to change or not.
194    ///
195    /// When disabled (which is the default), parsing CSV data will return an
196    /// error if a record is found with a number of fields different from the
197    /// number of fields in a previous record.
198    ///
199    /// When enabled, this error checking is turned off.
200    ///
201    /// # Example: flexible records enabled
202    ///
203    /// ```
204    /// use std::error::Error;
205    /// use futures::stream::StreamExt;
206    /// use csv_async::AsyncReaderBuilder;
207    ///
208    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
209    /// async fn example() -> Result<(), Box<dyn Error>> {
210    ///     // Notice that the first row is missing the population count.
211    ///     let data = "\
212    /// city,country,pop
213    /// Boston,United States
214    /// ";
215    ///     let mut rdr = AsyncReaderBuilder::new()
216    ///         .flexible(true)
217    ///         .create_reader(data.as_bytes());
218    ///     let mut records = rdr.records();
219    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States"]);
220    ///     Ok(())
221    /// }
222    /// ```
223    ///
224    /// # Example: flexible records disabled
225    ///
226    /// This shows the error that appears when records of unequal length
227    /// are found and flexible records have been disabled (which is the
228    /// default).
229    ///
230    /// ```
231    /// use std::error::Error;
232    /// use futures::stream::StreamExt;
233    /// use csv_async::{ErrorKind, AsyncReaderBuilder};
234    ///
235    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
236    /// async fn example() -> Result<(), Box<dyn Error>> {
237    ///     // Notice that the first row is missing the population count.
238    ///     let data = "\
239    /// city,country,pop
240    /// Boston,United States
241    /// ";
242    ///     let mut rdr = AsyncReaderBuilder::new()
243    ///         .flexible(false)
244    ///         .create_reader(data.as_bytes());
245    ///
246    ///     let mut records = rdr.records();
247    ///     match records.next().await {
248    ///         Some(Err(err)) => match *err.kind() {
249    ///             ErrorKind::UnequalLengths { expected_len, len, .. } => {
250    ///                 // The header row has 3 fields...
251    ///                 assert_eq!(expected_len, 3);
252    ///                 // ... but the first row has only 2 fields.
253    ///                 assert_eq!(len, 2);
254    ///                 Ok(())
255    ///             }
256    ///             ref wrong => {
257    ///                 Err(From::from(format!(
258    ///                     "expected UnequalLengths error but got {:?}",
259    ///                     wrong)))
260    ///             }
261    ///         }
262    ///         Some(Ok(rec)) =>
263    ///             Err(From::from(format!(
264    ///                 "expected one errored record but got good record {:?}",
265    ///                  rec))),
266    ///         None =>
267    ///            Err(From::from(
268    ///                "expected one errored record but got none"))
269    ///     }
270    /// }
271    /// ```
272    pub fn flexible(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
273        self.flexible = yes;
274        self
275    }
276
277    /// If set, CSV records' stream will end when first i/o error happens.
278    /// Otherwise CSV reader will continue trying to read from underlying reader.
279    /// For sample, please see unit test `behavior_on_io_errors` in following
280    /// [source file](https://github.com/gwierzchowski/csv-async/blob/master/src/async_readers/ardr_futures.rs).
281    /// 
282    /// By default this option is set.
283    pub fn end_on_io_error(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
284        self.end_on_io_error = yes;
285        self
286    }
287
288    /// Whether fields are trimmed of leading and trailing whitespace or not.
289    ///
290    /// By default, no trimming is performed. This method permits one to
291    /// override that behavior and choose one of the following options:
292    ///
293    /// 1. `Trim::Headers` trims only header values.
294    /// 2. `Trim::Fields` trims only non-header or "field" values.
295    /// 3. `Trim::All` trims both header and non-header values.
296    ///
297    /// A value is only interpreted as a header value if this CSV reader is
298    /// configured to read a header record (which is the default).
299    ///
300    /// When reading string records, characters meeting the definition of
301    /// Unicode whitespace are trimmed. When reading byte records, characters
302    /// meeting the definition of ASCII whitespace are trimmed. ASCII
303    /// whitespace characters correspond to the set `[\t\n\v\f\r ]`.
304    ///
305    /// # Example
306    ///
307    /// This example shows what happens when all values are trimmed.
308    ///
309    /// ```
310    /// use std::error::Error;
311    /// use futures::stream::StreamExt;
312    /// use csv_async::{AsyncReaderBuilder, StringRecord, Trim};
313    ///
314    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
315    /// async fn example() -> Result<(), Box<dyn Error>> {
316    ///     let data = "\
317    /// city ,   country ,  pop
318    /// Boston,\"
319    ///    United States\",4628910
320    /// Concord,   United States   ,42695
321    /// ";
322    ///     let mut rdr = AsyncReaderBuilder::new()
323    ///         .trim(Trim::All)
324    ///         .create_reader(data.as_bytes());
325    ///     let records = rdr
326    ///         .records()
327    ///         .map(Result::unwrap)
328    ///         .collect::<Vec<StringRecord>>().await;
329    ///     assert_eq!(records, vec![
330    ///         vec!["Boston", "United States", "4628910"],
331    ///         vec!["Concord", "United States", "42695"],
332    ///     ]);
333    ///     Ok(())
334    /// }
335    /// ```
336    pub fn trim(&mut self, trim: Trim) -> &mut AsyncReaderBuilder {
337        self.trim = trim;
338        self
339    }
340
341    /// The record terminator to use when parsing CSV.
342    ///
343    /// A record terminator can be any single byte. The default is a special
344    /// value, `Terminator::CRLF`, which treats any occurrence of `\r`, `\n`
345    /// or `\r\n` as a single record terminator.
346    ///
347    /// # Example: `$` as a record terminator
348    ///
349    /// ```
350    /// use std::error::Error;
351    /// use futures::stream::StreamExt;
352    /// use csv_async::{AsyncReaderBuilder, Terminator};
353    ///
354    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
355    /// async fn example() -> Result<(), Box<dyn Error>> {
356    ///     let data = "city,country,pop$Boston,United States,4628910";
357    ///     let mut rdr = AsyncReaderBuilder::new()
358    ///         .terminator(Terminator::Any(b'$'))
359    ///         .create_reader(data.as_bytes());
360    ///     let mut iter = rdr.records();
361    ///     assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
362    ///     assert!(iter.next().await.is_none());
363    ///     Ok(())
364    /// }
365    /// ```
366    pub fn terminator(&mut self, term: Terminator) -> &mut AsyncReaderBuilder {
367        self.builder.terminator(term.to_core());
368        self
369    }
370
371    /// The quote character to use when parsing CSV.
372    ///
373    /// The default is `b'"'`.
374    ///
375    /// # Example: single quotes instead of double quotes
376    ///
377    /// ```
378    /// use std::error::Error;
379    /// use futures::stream::StreamExt;
380    /// use csv_async::AsyncReaderBuilder;
381    ///
382    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
383    /// async fn example() -> Result<(), Box<dyn Error>> {
384    ///     let data = "\
385    /// city,country,pop
386    /// Boston,'United States',4628910
387    /// ";
388    ///     let mut rdr = AsyncReaderBuilder::new()
389    ///         .quote(b'\'')
390    ///         .create_reader(data.as_bytes());
391    ///     let mut iter = rdr.records();
392    ///     assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
393    ///     assert!(iter.next().await.is_none());
394    ///     Ok(())
395    /// }
396    /// ```
397    pub fn quote(&mut self, quote: u8) -> &mut AsyncReaderBuilder {
398        self.builder.quote(quote);
399        self
400    }
401
402    /// The escape character to use when parsing CSV.
403    ///
404    /// In some variants of CSV, quotes are escaped using a special escape
405    /// character like `\` (instead of escaping quotes by doubling them).
406    ///
407    /// By default, recognizing these idiosyncratic escapes is disabled.
408    ///
409    /// # Example
410    ///
411    /// ```
412    /// use std::error::Error;
413    /// use futures::stream::StreamExt;
414    /// use csv_async::AsyncReaderBuilder;
415    ///
416    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
417    /// async fn example() -> Result<(), Box<dyn Error>> {
418    ///     let data = "\
419    /// city,country,pop
420    /// Boston,\"The \\\"United\\\" States\",4628910
421    /// ";
422    ///     let mut rdr = AsyncReaderBuilder::new()
423    ///         .escape(Some(b'\\'))
424    ///         .create_reader(data.as_bytes());
425    ///     let mut records = rdr.records();
426    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "The \"United\" States", "4628910"]);
427    ///     Ok(())
428    /// }
429    /// ```
430    pub fn escape(&mut self, escape: Option<u8>) -> &mut AsyncReaderBuilder {
431        self.builder.escape(escape);
432        self
433    }
434
435    /// Enable double quote escapes.
436    ///
437    /// This is enabled by default, but it may be disabled. When disabled,
438    /// doubled quotes are not interpreted as escapes.
439    ///
440    /// # Example
441    ///
442    /// ```
443    /// use std::error::Error;
444    /// use futures::stream::StreamExt;
445    /// use csv_async::AsyncReaderBuilder;
446    ///
447    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
448    /// async fn example() -> Result<(), Box<dyn Error>> {
449    ///     let data = "\
450    /// city,country,pop
451    /// Boston,\"The \"\"United\"\" States\",4628910
452    /// ";
453    ///     let mut rdr = AsyncReaderBuilder::new()
454    ///         .double_quote(false)
455    ///         .create_reader(data.as_bytes());
456    ///     let mut records = rdr.records();
457    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "The \"United\"\" States\"", "4628910"]);
458    ///     Ok(())
459    /// }
460    /// ```
461    pub fn double_quote(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
462        self.builder.double_quote(yes);
463        self
464    }
465
466    /// Enable or disable quoting.
467    ///
468    /// This is enabled by default, but it may be disabled. When disabled,
469    /// quotes are not treated specially.
470    ///
471    /// # Example
472    ///
473    /// ```
474    /// use std::error::Error;
475    /// use futures::stream::StreamExt;
476    /// use csv_async::AsyncReaderBuilder;
477    ///
478    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
479    /// async fn example() -> Result<(), Box<dyn Error>> {
480    ///     let data = "\
481    /// city,country,pop
482    /// Boston,\"The United States,4628910
483    /// ";
484    ///     let mut rdr = AsyncReaderBuilder::new()
485    ///         .quoting(false)
486    ///         .create_reader(data.as_bytes());
487    ///     let mut records = rdr.records();
488    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "\"The United States", "4628910"]);
489    ///     Ok(())
490    /// }
491    /// ```
492    pub fn quoting(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
493        self.builder.quoting(yes);
494        self
495    }
496
497    /// The comment character to use when parsing CSV.
498    ///
499    /// If the start of a record begins with the byte given here, then that
500    /// line is ignored by the CSV parser.
501    ///
502    /// This is disabled by default.
503    ///
504    /// # Example
505    ///
506    /// ```
507    /// use std::error::Error;
508    /// use futures::stream::StreamExt;
509    /// use csv_async::AsyncReaderBuilder;
510    ///
511    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
512    /// async fn example() -> Result<(), Box<dyn Error>> {
513    ///     let data = "\
514    /// city,country,pop
515    /// #Concord,United States,42695
516    /// Boston,United States,4628910
517    /// ";
518    ///     let mut rdr = AsyncReaderBuilder::new()
519    ///         .comment(Some(b'#'))
520    ///         .create_reader(data.as_bytes());
521    ///     let mut records = rdr.records();
522    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
523    ///     assert!(records.next().await.is_none());
524    ///     Ok(())
525    /// }
526    /// ```
527    pub fn comment(&mut self, comment: Option<u8>) -> &mut AsyncReaderBuilder {
528        self.builder.comment(comment);
529        self
530    }
531
532    /// A convenience method for specifying a configuration to read ASCII
533    /// delimited text.
534    ///
535    /// This sets the delimiter and record terminator to the ASCII unit
536    /// separator (`\x1F`) and record separator (`\x1E`), respectively.
537    ///
538    /// # Example
539    ///
540    /// ```
541    /// use std::error::Error;
542    /// use futures::stream::StreamExt;
543    /// use csv_async::AsyncReaderBuilder;
544    ///
545    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
546    /// async fn example() -> Result<(), Box<dyn Error>> {
547    ///     let data = "\
548    /// city\x1Fcountry\x1Fpop\x1EBoston\x1FUnited States\x1F4628910";
549    ///     let mut rdr = AsyncReaderBuilder::new()
550    ///         .ascii()
551    ///         .create_reader(data.as_bytes());
552    ///     let mut records = rdr.byte_records();
553    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
554    ///     assert!(records.next().await.is_none());
555    ///     Ok(())
556    /// }
557    /// ```
558    pub fn ascii(&mut self) -> &mut AsyncReaderBuilder {
559        self.builder.ascii();
560        self
561    }
562
563    /// Set the capacity (in bytes) of the buffer used in the CSV reader.
564    /// This defaults to a reasonable setting.
565    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut AsyncReaderBuilder {
566        self.capacity = capacity;
567        self
568    }
569
570    /// Enable or disable the NFA for parsing CSV.
571    ///
572    /// This is intended to be a debug option. The NFA is always slower than
573    /// the DFA.
574    #[doc(hidden)]
575    pub fn nfa(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
576        self.builder.nfa(yes);
577        self
578    }
579}
580
581//-//////////////////////////////////////////////////////////////////////////////////////////////
582//-// Reader
583//-//////////////////////////////////////////////////////////////////////////////////////////////
584
585#[derive(Debug)]
586pub struct ReaderState {
587    /// When set, this contains the first row of any parsed CSV data.
588    ///
589    /// This is always populated, regardless of whether `has_headers` is set.
590    headers: Option<Headers>,
591    /// When set, the first row of parsed CSV data is excluded from things
592    /// that read records, like iterators and `read_record`.
593    has_headers: bool,
594    /// When set, there is no restriction on the length of records. When not
595    /// set, every record must have the same number of fields, or else an error
596    /// is reported.
597    flexible: bool,
598    trim: Trim,
599    /// The number of fields in the first record parsed.
600    first_field_count: Option<u64>,
601    /// The current position of the parser.
602    ///
603    /// Note that this position is only observable by callers at the start
604    /// of a record. More granular positions are not supported.
605    cur_pos: Position,
606    /// Whether the first record has been read or not.
607    first: bool,
608    /// Whether the reader has been seek or not.
609    seeked: bool,
610    /// If set, CSV records' stream will end when first i/o error happens.
611    /// Otherwise it will continue trying to read from underlying reader.
612    end_on_io_error: bool,
613    /// IO errors on the underlying reader will be considered as an EOF for
614    /// subsequent read attempts, as it would be incorrect to keep on trying
615    /// to read when the underlying reader has broken.
616    ///
617    /// For clarity, having the best `Debug` impl and in case they need to be
618    /// treated differently at some point, we store whether the `EOF` is
619    /// considered because an actual EOF happened, or because we encountered
620    /// an IO error.
621    /// This has no additional runtime cost.
622    eof: ReaderEofState,
623}
624
625/// Whether EOF of the underlying reader has been reached or not.
626///
627/// IO errors on the underlying reader will be considered as an EOF for
628/// subsequent read attempts, as it would be incorrect to keep on trying
629/// to read when the underlying reader has broken.
630///
631/// For clarity, having the best `Debug` impl and in case they need to be
632/// treated differently at some point, we store whether the `EOF` is
633/// considered because an actual EOF happened, or because we encountered
634/// an IO error
635#[derive(Debug, Clone, Copy, PartialEq, Eq)]
636enum ReaderEofState {
637    NotEof,
638    Eof,
639    IOError,
640}
641
642/// Headers encapsulates any data associated with the headers of CSV data.
643///
644/// The headers always correspond to the first row.
645#[derive(Debug)]
646struct Headers {
647    /// The header, as raw bytes.
648    byte_record: ByteRecord,
649    /// The header, as valid UTF-8 (or a UTF-8 error).
650    string_record: result::Result<StringRecord, Utf8Error>,
651}
652
653impl ReaderState {
654    #[inline(always)]
655    fn add_record(&mut self, record: &ByteRecord) -> Result<()> {
656        let i = self.cur_pos.record();
657        self.cur_pos.set_record(i.checked_add(1).unwrap());
658        if !self.flexible {
659            match self.first_field_count {
660                None => self.first_field_count = Some(record.len() as u64),
661                Some(expected) => {
662                    if record.len() as u64 != expected {
663                        return Err(Error::new(ErrorKind::UnequalLengths {
664                            pos: record.position().map(Clone::clone),
665                            expected_len: expected,
666                            len: record.len() as u64,
667                        }));
668                    }
669                }
670            }
671        }
672        Ok(())
673    }
674}
675/// CSV async reader internal implementation used by both record reader and deserializer.
676/// 
677#[derive(Debug)]
678pub struct AsyncReaderImpl<R> {
679    /// The underlying CSV parser.
680    ///
681    /// We explicitly put this on the heap because CoreReader embeds an entire
682    /// DFA transition table, which along with other things, tallies up to
683    /// almost 500 bytes on the stack.
684    core: Box<CoreReader>,
685    /// The underlying reader.
686    rdr: io::BufReader<R>,
687    /// Various state tracking.
688    ///
689    /// There is more state embedded in the `CoreReader`.
690    state: ReaderState,
691}
692
693#[must_use = "futures do nothing unless you `.await` or poll them"]
694struct FillBuf<'a, R: AsyncBufRead + ?Sized> {
695    reader: &'a mut R,
696}
697
698impl<R: AsyncBufRead + ?Sized + Unpin> Unpin for FillBuf<'_, R> {}
699
700impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> {
701    pub fn new(reader: &'a mut R) -> Self {
702        Self { reader }
703    }
704}
705
706impl<R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'_, R> {
707    type Output = io::Result<usize>;
708    
709    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
710        match Pin::new(&mut *self.reader).poll_fill_buf(cx) {
711            Poll::Ready(res) => {
712                match res {
713                    Ok(res) => Poll::Ready(Ok(res.len())),
714                    Err(e) => Poll::Ready(Err(e))
715                }
716            },
717            Poll::Pending => Poll::Pending
718        }
719    }
720}
721
722impl<'r, R> AsyncReaderImpl<R>
723where
724    R: io::AsyncRead + Unpin + 'r,
725{
726    /// Create a new CSV reader given a builder and a source of underlying
727    /// bytes.
728    fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncReaderImpl<R> {
729        AsyncReaderImpl {
730            core: Box::new(builder.builder.build()),
731            rdr: io::BufReader::with_capacity(builder.capacity, rdr),
732            state: ReaderState {
733                headers: None,
734                has_headers: builder.has_headers,
735                flexible: builder.flexible,
736                trim: builder.trim,
737                end_on_io_error: builder.end_on_io_error,
738                first_field_count: None,
739                cur_pos: Position::new(),
740                first: false,
741                seeked: false,
742                eof: ReaderEofState::NotEof,
743            },
744        }
745    }
746
747    /// Returns a reference to the first row read by this parser.
748    ///
749    pub async fn headers(&mut self) -> Result<&StringRecord> {
750        if self.state.headers.is_none() {
751            let mut record = ByteRecord::new();
752            self.read_byte_record_impl(&mut record).await?;
753            self.set_headers_impl(Err(record));
754        }
755        let headers = self.state.headers.as_ref().unwrap();
756        match headers.string_record {
757            Ok(ref record) => Ok(record),
758            Err(ref err) => Err(Error::new(ErrorKind::Utf8 {
759                pos: headers.byte_record.position().map(Clone::clone),
760                err: err.clone(),
761            })),
762        }
763    }
764
765    /// Returns a reference to the first row read by this parser as raw bytes.
766    ///
767    pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
768        if self.state.headers.is_none() {
769            let mut record = ByteRecord::new();
770            self.read_byte_record_impl(&mut record).await?;
771            self.set_headers_impl(Err(record));
772        }
773        Ok(&self.state.headers.as_ref().unwrap().byte_record)
774    }
775
776    /// Set the headers of this CSV parser manually.
777    ///
778    pub fn set_headers(&mut self, headers: StringRecord) {
779        self.set_headers_impl(Ok(headers));
780    }
781
782    /// Set the headers of this CSV parser manually as raw bytes.
783    ///
784    pub fn set_byte_headers(&mut self, headers: ByteRecord) {
785        self.set_headers_impl(Err(headers));
786    }
787
788    fn set_headers_impl(
789        &mut self,
790        headers: result::Result<StringRecord, ByteRecord>,
791    ) {
792        // If we have string headers, then get byte headers. But if we have
793        // byte headers, then get the string headers (or a UTF-8 error).
794        let (mut str_headers, mut byte_headers) = match headers {
795            Ok(string) => {
796                let bytes = string.clone().into_byte_record();
797                (Ok(string), bytes)
798            }
799            Err(bytes) => {
800                match StringRecord::from_byte_record(bytes.clone()) {
801                    Ok(str_headers) => (Ok(str_headers), bytes),
802                    Err(err) => (Err(err.utf8_error().clone()), bytes),
803                }
804            }
805        };
806        if self.state.trim.should_trim_headers() {
807            if let Ok(ref mut str_headers) = str_headers.as_mut() {
808                str_headers.trim();
809            }
810            byte_headers.trim();
811        }
812        self.state.headers = Some(Headers {
813            byte_record: byte_headers,
814            string_record: str_headers,
815        });
816    }
817
818    /// Read a single row into the given record. Returns false when no more
819    /// records could be read.
820    pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
821        let result = record.read(self).await;
822        // We need to trim again because trimming string records includes
823        // Unicode whitespace. (ByteRecord trimming only includes ASCII
824        // whitespace.)
825        if self.state.trim.should_trim_fields() {
826            record.trim();
827        }
828        result
829    }
830
831    /// Read a single row into the given byte record. Returns false when no
832    /// more records could be read.
833    pub async fn read_byte_record(
834        &mut self,
835        record: &mut ByteRecord,
836    ) -> Result<bool> {
837        if !self.state.seeked && !self.state.has_headers && !self.state.first {
838            // If the caller indicated "no headers" and we haven't yielded the
839            // first record yet, then we should yield our header row if we have
840            // one.
841            if let Some(ref headers) = self.state.headers {
842                self.state.first = true;
843                record.clone_from(&headers.byte_record);
844                if self.state.trim.should_trim_fields() {
845                    record.trim();
846                }
847                return Ok(!record.is_empty());
848            }
849        }
850        let ok = self.read_byte_record_impl(record).await?;
851        self.state.first = true;
852        if !self.state.seeked && self.state.headers.is_none() {
853            self.set_headers_impl(Err(record.clone()));
854            // If the end user indicated that we have headers, then we should
855            // never return the first row. Instead, we should attempt to
856            // read and return the next one.
857            if self.state.has_headers {
858                let result = self.read_byte_record_impl(record).await;
859                if self.state.trim.should_trim_fields() {
860                    record.trim();
861                }
862                return result;
863            }
864        } else if self.state.trim.should_trim_fields() {
865            record.trim();
866        }
867        Ok(ok)
868    }
869
870    /// Read a byte record from the underlying CSV reader, without accounting
871    /// for headers.
872    #[inline(always)]
873    async fn read_byte_record_impl(
874        &mut self,
875        record: &mut ByteRecord,
876    ) -> Result<bool> {
877        use csv_core::ReadRecordResult::*;
878
879        record.clear();
880        record.set_position(Some(self.state.cur_pos.clone()));
881        match self.state.eof {
882            ReaderEofState::Eof => return Ok(false),
883            ReaderEofState::IOError => {
884                if self.state.end_on_io_error { return Ok(false) }
885            },
886            ReaderEofState::NotEof => {}
887        }
888        let (mut outlen, mut endlen) = (0, 0);
889        loop {
890            let (res, nin, nout, nend) = {
891                if let Err(err) = FillBuf::new(&mut self.rdr).await {
892                    self.state.eof = ReaderEofState::IOError;
893                    return Err(err.into());
894                }
895                let (fields, ends) = record.as_parts();
896                self.core.read_record(
897                    self.rdr.buffer(),
898                    &mut fields[outlen..],
899                    &mut ends[endlen..],
900                )
901            };
902            Pin::new(&mut self.rdr).consume(nin);
903            let byte = self.state.cur_pos.byte();
904            self.state
905                .cur_pos
906                .set_byte(byte + nin as u64)
907                .set_line(self.core.line());
908            outlen += nout;
909            endlen += nend;
910            match res {
911                InputEmpty => continue,
912                OutputFull => {
913                    record.expand_fields();
914                    continue;
915                }
916                OutputEndsFull => {
917                    record.expand_ends();
918                    continue;
919                }
920                Record => {
921                    record.set_len(endlen);
922                    self.state.add_record(record)?;
923                    return Ok(true);
924                }
925                End => {
926                    self.state.eof = ReaderEofState::Eof;
927                    return Ok(false);
928                }
929            }
930        }
931    }
932
933    /// Return the current position of this CSV reader.
934    ///
935    #[inline]
936    pub fn position(&self) -> &Position {
937        &self.state.cur_pos
938    }
939
940    /// Returns true if and only if this reader has been exhausted.
941    ///
942    pub fn is_done(&self) -> bool {
943        self.state.eof != ReaderEofState::NotEof
944    }
945
946    /// Returns true if and only if this reader has been configured to
947    /// interpret the first record as a header record.
948    pub fn has_headers(&self) -> bool {
949        self.state.has_headers
950    }
951
952    /// Returns a reference to the underlying reader.
953    pub fn get_ref(&self) -> &R {
954        self.rdr.get_ref()
955    }
956
957    /// Returns a mutable reference to the underlying reader.
958    pub fn get_mut(&mut self) -> &mut R {
959        self.rdr.get_mut()
960    }
961
962    /// Unwraps this CSV reader, returning the underlying reader.
963    ///
964    /// Note that any leftover data inside this reader's internal buffer is
965    /// lost.
966    pub fn into_inner(self) -> R {
967        self.rdr.into_inner()
968    }
969}
970
971impl<R: io::AsyncRead + io::AsyncSeek + Unpin> AsyncReaderImpl<R> {
972    /// Seeks the underlying reader to the position given.
973    ///
974    pub async fn seek(&mut self, pos: Position) -> Result<()> {
975        self.byte_headers().await?;
976        self.state.seeked = true;
977        if pos.byte() == self.state.cur_pos.byte() {
978            return Ok(());
979        }
980        self.rdr.seek(io::SeekFrom::Start(pos.byte())).await?;
981        self.core.reset();
982        self.core.set_line(pos.line());
983        self.state.cur_pos = pos;
984        self.state.eof = ReaderEofState::NotEof;
985        Ok(())
986    }
987
988    /// This is like `seek`, but provides direct control over how the seeking
989    /// operation is performed via `io::SeekFrom`.
990    pub async fn seek_raw(
991        &mut self,
992        seek_from: io::SeekFrom,
993        pos: Position,
994    ) -> Result<()> {
995        self.byte_headers().await?;
996        self.state.seeked = true;
997        self.rdr.seek(seek_from).await?;
998        self.core.reset();
999        self.core.set_line(pos.line());
1000        self.state.cur_pos = pos;
1001        self.state.eof = ReaderEofState::NotEof;
1002        Ok(())
1003    }
1004
1005    /// Seeks the underlying reader to first data record.
1006    ///
1007    #[cfg(feature = "tokio")]
1008    pub async fn rewind(&mut self) -> Result<()> {
1009        self.byte_headers().await?;
1010        self.state.seeked = false;
1011        self.state.headers = None;
1012        self.state.first = false;
1013        if self.state.cur_pos.byte() == 0 {
1014            return Ok(());
1015        }
1016        self.rdr.rewind().await?;
1017        self.core.reset();
1018        self.core.set_line(1);
1019        self.state.cur_pos.set_byte(0).set_line(1).set_record(0);
1020        self.state.eof = ReaderEofState::NotEof;
1021        Ok(())
1022    }
1023    #[cfg(not(feature = "tokio"))]
1024    pub async fn rewind(&mut self) -> Result<()> {
1025        self.byte_headers().await?;
1026        self.state.seeked = false;
1027        self.state.headers = None;
1028        self.state.first = false;
1029        if self.state.cur_pos.byte() == 0 {
1030            return Ok(());
1031        }
1032        self.rdr.seek(io::SeekFrom::Start(0)).await?;
1033        self.core.reset();
1034        self.core.set_line(1);
1035        self.state.cur_pos.set_byte(0).set_line(1).set_record(0);
1036        self.state.eof = ReaderEofState::NotEof;
1037        Ok(())
1038    }
1039}
1040
1041
1042//-//////////////////////////////////////////////////////////////////////////////////////////////
1043//-//////////////////////////////////////////////////////////////////////////////////////////////
1044
1045async fn read_record_borrowed<R>(
1046    rdr: &mut AsyncReaderImpl<R>,
1047    mut rec: StringRecord,
1048) -> (Option<Result<StringRecord>>, &mut AsyncReaderImpl<R>, StringRecord)
1049where
1050    R: io::AsyncRead + Unpin
1051{
1052    let result = match rdr.read_record(&mut rec).await {
1053        Err(err) => Some(Err(err)),
1054        Ok(true) => Some(Ok(rec.clone())),
1055        Ok(false) => None,
1056    };
1057
1058    (result, rdr, rec)
1059}
1060
1061/// A borrowed stream of records as strings.
1062///
1063/// The lifetime parameter `'r` refers to the lifetime of the underlying
1064/// CSV `Reader`.
1065#[allow(clippy::type_complexity)]
1066pub struct StringRecordsStream<'r, R>
1067where
1068    R: io::AsyncRead + Unpin + Send
1069{
1070    fut: Option<
1071        Pin<
1072            Box<
1073                dyn Future<
1074                        Output = (
1075                            Option<Result<StringRecord>>,
1076                            &'r mut AsyncReaderImpl<R>,
1077                            StringRecord,
1078                        ),
1079                    > + Send + 'r,
1080            >,
1081        >,
1082    >,
1083}
1084
1085impl<'r, R> StringRecordsStream<'r, R>
1086where
1087    R: io::AsyncRead + Unpin + Send
1088{
1089    fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
1090        Self {
1091            fut: Some(Pin::from(Box::new(read_record_borrowed(
1092                rdr,
1093                StringRecord::new(),
1094            )))),
1095        }
1096    }
1097}
1098
1099impl<'r, R> Stream for StringRecordsStream<'r, R>
1100where
1101    R: io::AsyncRead + Unpin + Send
1102{
1103    type Item = Result<StringRecord>;
1104
1105    fn poll_next(
1106        mut self: Pin<&mut Self>,
1107        cx: &mut Context,
1108    ) -> Poll<Option<Self::Item>> {
1109        if let Some(fut) = self.fut.as_mut() {
1110            match fut.as_mut().poll(cx) {
1111                Poll::Ready((result, rdr, rec)) => {
1112                    if result.is_some() {
1113                        self.fut =
1114                            Some(Pin::from(Box::new(read_record_borrowed(rdr, rec))));
1115                    } else {
1116                        self.fut = None;
1117                    }
1118                    Poll::Ready(result)
1119                }
1120                Poll::Pending => Poll::Pending,
1121            }
1122        } else {
1123            Poll::Ready(None)
1124        }
1125    }
1126}
1127
1128//-//////////////////////////////////////////////////////////////////////////////////////////////
1129//-//////////////////////////////////////////////////////////////////////////////////////////////
1130
1131async fn read_record<R>(
1132    mut rdr: AsyncReaderImpl<R>,
1133    mut rec: StringRecord,
1134) -> (Option<Result<StringRecord>>, AsyncReaderImpl<R>, StringRecord)
1135where
1136    R: io::AsyncRead + Unpin
1137{
1138    let result = match rdr.read_record(&mut rec).await {
1139        Err(err) => Some(Err(err)),
1140        Ok(true) => Some(Ok(rec.clone())),
1141        Ok(false) => None,
1142    };
1143
1144    (result, rdr, rec)
1145}
1146
1147/// An owned stream of records as strings.
1148#[allow(clippy::type_complexity)]
1149pub struct StringRecordsIntoStream<'r, R>
1150where
1151    R: io::AsyncRead + Unpin + Send
1152{
1153    fut: Option<
1154        Pin<
1155            Box<
1156                dyn Future<
1157                    Output = (
1158                        Option<Result<StringRecord>>,
1159                        AsyncReaderImpl<R>,
1160                        StringRecord,
1161                    ),
1162                > + Send + 'r,
1163            >,
1164        >,
1165    >,
1166}
1167
1168impl<'r, R> StringRecordsIntoStream<'r, R>
1169where
1170    R: io::AsyncRead + Unpin + Send + 'r
1171{
1172    fn new(rdr: AsyncReaderImpl<R>) -> Self {
1173        Self {
1174            fut: Some(Pin::from(Box::new(read_record(
1175                rdr,
1176                StringRecord::new(),
1177            )))),
1178        }
1179    }
1180}
1181
1182impl<'r, R> Stream for StringRecordsIntoStream<'r, R>
1183where
1184    R: io::AsyncRead + Unpin + Send + 'r
1185{
1186    type Item = Result<StringRecord>;
1187
1188    fn poll_next(
1189        mut self: Pin<&mut Self>,
1190        cx: &mut Context,
1191    ) -> Poll<Option<Self::Item>> {
1192        if let Some(fut) = self.fut.as_mut() {
1193            match fut.as_mut().poll(cx) {
1194                Poll::Ready((result, rdr, rec)) => {
1195                    if result.is_some() {
1196                        self.fut =
1197                            Some(Pin::from(Box::new(read_record(rdr, rec))));
1198                    } else {
1199                        self.fut = None;
1200                    }
1201
1202                    Poll::Ready(result)
1203                }
1204                Poll::Pending => Poll::Pending,
1205            }
1206        } else {
1207            Poll::Ready(None)
1208        }
1209    }
1210}
1211
1212//-//////////////////////////////////////////////////////////////////////////////////////////////
1213//-//////////////////////////////////////////////////////////////////////////////////////////////
1214
1215async fn read_byte_record_borrowed<R>(
1216    rdr: &mut AsyncReaderImpl<R>,
1217    mut rec: ByteRecord,
1218) -> (Option<Result<ByteRecord>>, &mut AsyncReaderImpl<R>, ByteRecord)
1219where
1220    R: io::AsyncRead + Unpin,
1221{
1222    let result = match rdr.read_byte_record(&mut rec).await {
1223        Err(err) => Some(Err(err)),
1224        Ok(true) => Some(Ok(rec.clone())),
1225        Ok(false) => None,
1226    };
1227
1228    (result, rdr, rec)
1229}
1230
1231/// A borrowed stream of records as raw bytes.
1232///
1233/// The lifetime parameter `'r` refers to the lifetime of the underlying
1234/// CSV `Reader`.
1235#[allow(clippy::type_complexity)]
1236pub struct ByteRecordsStream<'r, R>
1237where
1238    R: io::AsyncRead + Unpin + Send,
1239{
1240    fut: Option<
1241        Pin<
1242            Box<
1243                dyn Future<
1244                        Output = (
1245                            Option<Result<ByteRecord>>,
1246                            &'r mut AsyncReaderImpl<R>,
1247                            ByteRecord,
1248                        ),
1249                    > + Send + 'r,
1250            >,
1251        >,
1252    >,
1253}
1254
1255impl<'r, R> ByteRecordsStream<'r, R>
1256where
1257    R: io::AsyncRead + Unpin + Send + 'r,
1258{
1259    fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
1260        Self {
1261            fut: Some(Pin::from(Box::new(read_byte_record_borrowed(
1262                rdr,
1263                ByteRecord::new(),
1264            )))),
1265        }
1266    }
1267}
1268
1269impl<'r, R> Stream for ByteRecordsStream<'r, R>
1270where
1271    R: io::AsyncRead + Send + Unpin,
1272{
1273    type Item = Result<ByteRecord>;
1274
1275    fn poll_next(
1276        mut self: Pin<&mut Self>,
1277        cx: &mut Context,
1278    ) -> Poll<Option<Self::Item>> {
1279        if let Some(fut) = self.fut.as_mut() {
1280            match fut.as_mut().poll(cx) {
1281                Poll::Ready((result, rdr, rec)) => {
1282                    if result.is_some() {
1283                        self.fut =
1284                            Some(Pin::from(Box::new(read_byte_record_borrowed(rdr, rec))));
1285                    } else {
1286                        self.fut = None;
1287                    }
1288                    Poll::Ready(result)
1289                }
1290                Poll::Pending => Poll::Pending,
1291            }
1292        } else {
1293            Poll::Ready(None)
1294        }
1295    }
1296}
1297
1298//-//////////////////////////////////////////////////////////////////////////////////////////////
1299//-//////////////////////////////////////////////////////////////////////////////////////////////
1300
1301async fn read_byte_record<R>(
1302    mut rdr: AsyncReaderImpl<R>,
1303    mut rec: ByteRecord,
1304) -> (Option<Result<ByteRecord>>, AsyncReaderImpl<R>, ByteRecord)
1305where
1306    R: io::AsyncRead + Unpin
1307{
1308    let result = match rdr.read_byte_record(&mut rec).await {
1309        Err(err) => Some(Err(err)),
1310        Ok(true) => Some(Ok(rec.clone())),
1311        Ok(false) => None,
1312    };
1313
1314    (result, rdr, rec)
1315}
1316
1317/// An owned stream of records as raw bytes.
1318#[allow(clippy::type_complexity)]
1319pub struct ByteRecordsIntoStream<'r, R>
1320where
1321    R: io::AsyncRead + Unpin + Send
1322{
1323    fut: Option<
1324        Pin<
1325            Box<
1326                dyn Future<
1327                    Output = (
1328                        Option<Result<ByteRecord>>,
1329                        AsyncReaderImpl<R>,
1330                        ByteRecord,
1331                    ),
1332                > + Send + 'r,
1333            >,
1334        >,
1335    >,
1336}
1337
1338impl<'r, R> ByteRecordsIntoStream<'r, R>
1339where
1340    R: io::AsyncRead + Send + Unpin + 'r
1341{
1342    fn new(rdr: AsyncReaderImpl<R>) -> Self {
1343        Self {
1344            fut: Some(Pin::from(Box::new(read_byte_record(
1345                rdr,
1346                ByteRecord::new(),
1347            )))),
1348        }
1349    }
1350}
1351
1352impl<'r, R> Stream for ByteRecordsIntoStream<'r, R>
1353where
1354    R: io::AsyncRead + Send + Unpin + 'r
1355{
1356    type Item = Result<ByteRecord>;
1357
1358    fn poll_next(
1359        mut self: Pin<&mut Self>,
1360        cx: &mut Context,
1361    ) -> Poll<Option<Self::Item>> {
1362        if let Some(fut) = self.fut.as_mut() {
1363            match fut.as_mut().poll(cx) {
1364                Poll::Ready((result, rdr, rec)) => {
1365                    if result.is_some() {
1366                        self.fut =
1367                            Some(Pin::from(Box::new(read_byte_record(rdr, rec))));
1368                    } else {
1369                        self.fut = None;
1370                    }
1371                    Poll::Ready(result)
1372                }
1373                Poll::Pending => Poll::Pending,
1374            }
1375        } else {
1376            Poll::Ready(None)
1377        }
1378    }
1379}
1380
1381//-//////////////////////////////////////////////////////////////////////////////////////////////
1382//-//////////////////////////////////////////////////////////////////////////////////////////////
1383
1384cfg_if::cfg_if! {
1385if #[cfg(feature = "with_serde")] {
1386
1387async fn deserialize_record_borrowed<R, D: DeserializeOwned>(
1388    rdr: &mut AsyncReaderImpl<R>,
1389    headers: Option<StringRecord>,
1390    mut rec: StringRecord,
1391) -> (Option<Result<D>>, &mut AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
1392where
1393    R: io::AsyncRead + Unpin
1394{
1395    let result = match rdr.read_record(&mut rec).await {
1396        Err(err) => Some(Err(err)),
1397        Ok(true) => Some(rec.deserialize(headers.as_ref())),
1398        Ok(false) => None,
1399    };
1400
1401    (result, rdr, headers, rec)
1402}
1403
1404/// A borrowed stream of deserialized records.
1405///
1406/// The lifetime parameter `'r` refers to the lifetime of the underlying CSV `Reader`.
1407/// type, and `D` refers to the type that this stream will deserialize a record into.
1408#[allow(clippy::type_complexity)]
1409pub struct DeserializeRecordsStream<'r, R, D>
1410where
1411    R: io::AsyncRead + Unpin + Send
1412{
1413    header_fut: Option<
1414        Pin<
1415            Box<
1416                dyn Future<
1417                    Output = (
1418                        Result<StringRecord>,
1419                        &'r mut AsyncReaderImpl<R>,
1420                    )
1421                > + Send + 'r,
1422            >,
1423        >,
1424    >,
1425    rec_fut: Option<
1426        Pin<
1427            Box<
1428                dyn Future<
1429                    Output = (
1430                        Option<Result<D>>,
1431                        &'r mut AsyncReaderImpl<R>,
1432                        Option<StringRecord>,
1433                        StringRecord,
1434                    )
1435                > + Send + 'r,
1436            >,
1437        >,
1438    >,
1439}
1440
1441impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsStream<'r, R, D>
1442where
1443    R: io::AsyncRead + Unpin + Send
1444{
1445    fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
1446        let has_headers = rdr.has_headers();
1447        if has_headers {
1448            Self {
1449                header_fut: Some(Pin::from(Box::new(
1450                    async{ (rdr.headers().await.cloned(), rdr) }
1451                ))),
1452                rec_fut: None,
1453            }
1454        } else {
1455            Self {
1456                header_fut: None,
1457                rec_fut: Some(Pin::from(Box::new(
1458                    deserialize_record_borrowed(rdr, None, StringRecord::new())
1459                ))),
1460            }
1461        }
1462    }
1463}
1464
1465impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsStream<'r, R, D>
1466where
1467    R: io::AsyncRead + Unpin + Send
1468{
1469    type Item = Result<D>;
1470
1471    fn poll_next(
1472        mut self: Pin<&mut Self>,
1473        cx: &mut Context,
1474    ) -> Poll<Option<Self::Item>> {
1475        if let Some(header_fut) = &mut self.header_fut {
1476            match header_fut.as_mut().poll(cx) {
1477                Poll::Ready((Ok(headers), rdr)) => {
1478                    self.header_fut = None;
1479                    self.rec_fut = Some(Pin::from(Box::new(
1480                        deserialize_record_borrowed(rdr, Some(headers), StringRecord::new()),
1481                    )));
1482                    cx.waker().wake_by_ref();
1483                    Poll::Pending
1484                },
1485                Poll::Ready((Err(err), rdr)) => {
1486                    self.header_fut = None;
1487                    self.rec_fut = Some(Pin::from(Box::new(
1488                        deserialize_record_borrowed(rdr, None, StringRecord::new()),
1489                    )));
1490                    Poll::Ready(Some(Err(err)))
1491                },
1492                Poll::Pending => Poll::Pending,
1493            }
1494        } else if let Some(fut) = self.rec_fut.as_mut() {
1495            match fut.as_mut().poll(cx) {
1496                Poll::Ready((result, rdr, headers, rec)) => {
1497                    if result.is_some() {
1498                        self.rec_fut = Some(Pin::from(Box::new(
1499                            deserialize_record_borrowed(rdr, headers, rec),
1500                        )));
1501                    } else {
1502                        self.rec_fut = None;
1503                    }
1504                    Poll::Ready(result)
1505                }
1506                Poll::Pending => Poll::Pending,
1507            }
1508        } else {
1509            Poll::Ready(None)
1510        }
1511    }
1512}
1513
1514//-//////////////////////////////////////////////////////////////////////////////////////////////
1515//-//////////////////////////////////////////////////////////////////////////////////////////////
1516
1517async fn deserialize_record_with_pos_borrowed<R, D: DeserializeOwned>(
1518    rdr: &mut AsyncReaderImpl<R>,
1519    headers: Option<StringRecord>,
1520    mut rec: StringRecord,
1521) -> (Option<Result<D>>, Position, &mut AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
1522where
1523    R: io::AsyncRead + Unpin
1524{
1525    let pos = rdr.position().clone();
1526    let result = match rdr.read_record(&mut rec).await {
1527        Err(err) => Some(Err(err)),
1528        Ok(true) => Some(rec.deserialize(headers.as_ref())),
1529        Ok(false) => None,
1530    };
1531
1532    (result, pos, rdr, headers, rec)
1533}
1534
1535/// A borrowed stream of pairs: deserialized records and position in stream before reading record.
1536///
1537/// The lifetime parameter `'r` refers to the lifetime of the underlying CSV `Reader`.
1538/// type, and `D` refers to the type that this stream will deserialize a record into.
1539#[allow(clippy::type_complexity)]
1540pub struct DeserializeRecordsStreamPos<'r, R, D>
1541where
1542    R: io::AsyncRead + Unpin + Send
1543{
1544    header_fut: Option<
1545        Pin<
1546            Box<
1547                dyn Future<
1548                    Output = (
1549                        Result<StringRecord>,
1550                        &'r mut AsyncReaderImpl<R>,
1551                    )
1552                > + Send + 'r,
1553            >,
1554        >,
1555    >,
1556    rec_fut: Option<
1557        Pin<
1558            Box<
1559                dyn Future<
1560                    Output = (
1561                        Option<Result<D>>,
1562                        Position,
1563                        &'r mut AsyncReaderImpl<R>,
1564                        Option<StringRecord>,
1565                        StringRecord,
1566                    )
1567                > + Send + 'r,
1568            >,
1569        >,
1570    >,
1571}
1572
1573impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsStreamPos<'r, R, D>
1574where
1575    R: io::AsyncRead + Unpin + Send
1576{
1577    fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
1578        let has_headers = rdr.has_headers();
1579        if has_headers {
1580            Self {
1581                header_fut: Some(Pin::from(Box::new(
1582                    async{ (rdr.headers().await.cloned(), rdr) }
1583                ))),
1584                rec_fut: None,
1585            }
1586        } else {
1587            Self {
1588                header_fut: None,
1589                rec_fut: Some(Pin::from(Box::new(
1590                    deserialize_record_with_pos_borrowed(rdr, None, StringRecord::new())
1591                ))),
1592            }
1593        }
1594    }
1595}
1596
1597impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsStreamPos<'r, R, D>
1598where
1599    R: io::AsyncRead + Unpin + Send
1600{
1601    type Item = (Result<D>, Position);
1602
1603    fn poll_next(
1604        mut self: Pin<&mut Self>,
1605        cx: &mut Context,
1606    ) -> Poll<Option<Self::Item>> {
1607        if let Some(header_fut) = &mut self.header_fut {
1608            match header_fut.as_mut().poll(cx) {
1609                Poll::Ready((Ok(headers), rdr)) => {
1610                    self.header_fut = None;
1611                    self.rec_fut = Some(Pin::from(Box::new(
1612                        deserialize_record_with_pos_borrowed(rdr, Some(headers), StringRecord::new()),
1613                    )));
1614                    cx.waker().wake_by_ref();
1615                    Poll::Pending
1616                },
1617                Poll::Ready((Err(err), rdr)) => {
1618                    self.header_fut = None;
1619                    let pos = rdr.position().clone();
1620                    self.rec_fut = Some(Pin::from(Box::new(
1621                        deserialize_record_with_pos_borrowed(rdr, None, StringRecord::new()),
1622                    )));
1623                    Poll::Ready(Some((Err(err), pos)))
1624                },
1625                Poll::Pending => Poll::Pending,
1626            }
1627        } else if let Some(fut) = self.rec_fut.as_mut() {
1628            match fut.as_mut().poll(cx) {
1629                Poll::Ready((result, pos, rdr, headers, rec)) => {
1630                    if let Some(result) = result {
1631                        self.rec_fut = Some(Pin::from(Box::new(
1632                            deserialize_record_with_pos_borrowed(rdr, headers, rec),
1633                        )));
1634                        Poll::Ready(Some((result, pos)))
1635                    } else {
1636                        self.rec_fut = None;
1637                        Poll::Ready(None)
1638                    }
1639                }
1640                Poll::Pending => Poll::Pending,
1641            }
1642        } else {
1643            Poll::Ready(None)
1644        }
1645    }
1646}
1647
1648//-//////////////////////////////////////////////////////////////////////////////////////////////
1649//-//////////////////////////////////////////////////////////////////////////////////////////////
1650
1651async fn deserialize_record<R, D: DeserializeOwned>(
1652    mut rdr: AsyncReaderImpl<R>,
1653    headers: Option<StringRecord>,
1654    mut rec: StringRecord,
1655) -> (Option<Result<D>>, AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
1656where
1657    R: io::AsyncRead + Unpin
1658{
1659    let result = match rdr.read_record(&mut rec).await {
1660        Err(err) => Some(Err(err)),
1661        Ok(true) => Some(rec.deserialize(headers.as_ref())),
1662        Ok(false) => None,
1663    };
1664
1665    (result, rdr, headers, rec)
1666}
1667
1668/// A owned stream of deserialized records.
1669///
1670/// The lifetime parameter `'r` refers to the lifetime of the underlying CSV `Reader`.
1671/// type, and `D` refers to the type that this stream will deserialize a record into.
1672#[allow(clippy::type_complexity)]
1673pub struct DeserializeRecordsIntoStream<'r, R, D>
1674where
1675    R: io::AsyncRead + Unpin + Send
1676{
1677    header_fut: Option<
1678        Pin<
1679            Box<
1680                dyn Future<
1681                    Output = (
1682                        Result<StringRecord>,
1683                        AsyncReaderImpl<R>,
1684                    )
1685                > + Send + 'r,
1686            >,
1687        >,
1688    >,
1689    rec_fut: Option<
1690        Pin<
1691            Box<
1692                dyn Future<
1693                    Output = (
1694                        Option<Result<D>>,
1695                        AsyncReaderImpl<R>,
1696                        Option<StringRecord>,
1697                        StringRecord,
1698                    )
1699                > + Send + 'r,
1700            >,
1701        >,
1702    >,
1703}
1704
1705impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsIntoStream<'r, R, D>
1706where
1707    R: io::AsyncRead + Unpin + Send + 'r
1708{
1709    fn new(mut rdr: AsyncReaderImpl<R>) -> Self {
1710        let has_headers = rdr.has_headers();
1711        if has_headers {
1712            Self {
1713                header_fut: Some(Pin::from(Box::new(
1714                    async{ (rdr.headers().await.cloned(), rdr) }
1715                ))),
1716                rec_fut: None,
1717            }
1718        } else {
1719            Self {
1720                header_fut: None,
1721                rec_fut: Some(Pin::from(Box::new(
1722                    deserialize_record(rdr, None, StringRecord::new())
1723                ))),
1724            }
1725        }
1726    }
1727}
1728
1729impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsIntoStream<'r, R, D>
1730where
1731    R: io::AsyncRead + Unpin + Send + 'r
1732{
1733    type Item = Result<D>;
1734
1735    fn poll_next(
1736        mut self: Pin<&mut Self>,
1737        cx: &mut Context,
1738    ) -> Poll<Option<Self::Item>> {
1739        if let Some(header_fut) = &mut self.header_fut {
1740            match header_fut.as_mut().poll(cx) {
1741                Poll::Ready((Ok(headers), rdr)) => {
1742                    self.header_fut = None;
1743                    self.rec_fut = Some(Pin::from(Box::new(
1744                        deserialize_record(rdr, Some(headers), StringRecord::new()),
1745                    )));
1746                    cx.waker().wake_by_ref();
1747                    Poll::Pending
1748                },
1749                Poll::Ready((Err(err), rdr)) => {
1750                    self.header_fut = None;
1751                    self.rec_fut = Some(Pin::from(Box::new(
1752                        deserialize_record(rdr, None, StringRecord::new()),
1753                    )));
1754                    Poll::Ready(Some(Err(err)))
1755                },
1756                Poll::Pending => Poll::Pending,
1757            }
1758        } else if let Some(fut) = self.rec_fut.as_mut() {
1759            match fut.as_mut().poll(cx) {
1760                Poll::Ready((result, rdr, headers, rec)) => {
1761                    if result.is_some() {
1762                        self.rec_fut = Some(Pin::from(Box::new(
1763                            deserialize_record(rdr, headers, rec),
1764                        )));
1765                    } else {
1766                        self.rec_fut = None;
1767                    }
1768                    Poll::Ready(result)
1769                }
1770                Poll::Pending => Poll::Pending,
1771            }
1772        } else {
1773            Poll::Ready(None)
1774        }
1775    }
1776}
1777
1778//-//////////////////////////////////////////////////////////////////////////////////////////////
1779//-//////////////////////////////////////////////////////////////////////////////////////////////
1780
1781async fn deserialize_record_with_pos<R, D: DeserializeOwned>(
1782    mut rdr: AsyncReaderImpl<R>,
1783    headers: Option<StringRecord>,
1784    mut rec: StringRecord,
1785) -> (Option<Result<D>>, Position, AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
1786where
1787    R: io::AsyncRead + Unpin
1788{
1789    let pos = rdr.position().clone();
1790    let result = match rdr.read_record(&mut rec).await {
1791        Err(err) => Some(Err(err)),
1792        Ok(true) => Some(rec.deserialize(headers.as_ref())),
1793        Ok(false) => None,
1794    };
1795
1796    (result, pos, rdr, headers, rec)
1797}
1798
1799/// A owned stream of pairs: deserialized records and position in stream before reading record.
1800///
1801/// The lifetime parameter `'r` refers to the lifetime of the underlying CSV `Reader`.
1802/// type, and `D` refers to the type that this stream will deserialize a record into.
1803#[allow(clippy::type_complexity)]
1804pub struct DeserializeRecordsIntoStreamPos<'r, R, D>
1805where
1806    R: io::AsyncRead + Unpin + Send
1807{
1808    header_fut: Option<
1809        Pin<
1810            Box<
1811                dyn Future<
1812                    Output = (
1813                        Result<StringRecord>,
1814                        AsyncReaderImpl<R>,
1815                    )
1816                > + Send + 'r,
1817            >,
1818        >,
1819    >,
1820    rec_fut: Option<
1821        Pin<
1822            Box<
1823                dyn Future<
1824                    Output = (
1825                        Option<Result<D>>,
1826                        Position,
1827                        AsyncReaderImpl<R>,
1828                        Option<StringRecord>,
1829                        StringRecord,
1830                    )
1831                > + Send + 'r,
1832            >,
1833        >,
1834    >,
1835}
1836
1837impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsIntoStreamPos<'r, R, D>
1838where
1839    R: io::AsyncRead + Unpin + Send + 'r
1840{
1841    fn new(mut rdr: AsyncReaderImpl<R>) -> Self {
1842        let has_headers = rdr.has_headers();
1843        if has_headers {
1844            Self {
1845                header_fut: Some(Pin::from(Box::new(
1846                    async{ (rdr.headers().await.cloned(), rdr) }
1847                ))),
1848                rec_fut: None,
1849            }
1850        } else {
1851            Self {
1852                header_fut: None,
1853                rec_fut: Some(Pin::from(Box::new(
1854                    deserialize_record_with_pos(rdr, None, StringRecord::new())
1855                ))),
1856            }
1857        }
1858    }
1859}
1860
1861impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsIntoStreamPos<'r, R, D>
1862where
1863    R: io::AsyncRead + Unpin + Send + 'r
1864{
1865    type Item = (Result<D>, Position);
1866
1867    fn poll_next(
1868        mut self: Pin<&mut Self>,
1869        cx: &mut Context,
1870    ) -> Poll<Option<Self::Item>> {
1871        if let Some(header_fut) = &mut self.header_fut {
1872            match header_fut.as_mut().poll(cx) {
1873                Poll::Ready((Ok(headers), rdr)) => {
1874                    self.header_fut = None;
1875                    self.rec_fut = Some(Pin::from(Box::new(
1876                        deserialize_record_with_pos(rdr, Some(headers), StringRecord::new()),
1877                    )));
1878                    cx.waker().wake_by_ref();
1879                    Poll::Pending
1880                },
1881                Poll::Ready((Err(err), rdr)) => {
1882                    self.header_fut = None;
1883                    let pos = rdr.position().clone();
1884                    self.rec_fut = Some(Pin::from(Box::new(
1885                        deserialize_record_with_pos(rdr, None, StringRecord::new()),
1886                    )));
1887                    Poll::Ready(Some((Err(err), pos)))
1888                },
1889                Poll::Pending => Poll::Pending,
1890            }
1891        } else if let Some(fut) = self.rec_fut.as_mut() {
1892            match fut.as_mut().poll(cx) {
1893                Poll::Ready((result, pos, rdr, headers, rec)) => {
1894                    if let Some(result) = result {
1895                        self.rec_fut = Some(Pin::from(Box::new(
1896                            deserialize_record_with_pos(rdr, headers, rec),
1897                        )));
1898                        Poll::Ready(Some((result, pos)))
1899                    } else {
1900                        self.rec_fut = None;
1901                        Poll::Ready(None)
1902                    }
1903                }
1904                Poll::Pending => Poll::Pending,
1905            }
1906        } else {
1907            Poll::Ready(None)
1908        }
1909    }
1910}
1911
1912}} // fi #[cfg(feature = "with_serde")]
1913