csv_async/async_readers/
ardr_tokio.rs

1use tokio::io;
2
3use crate::AsyncReaderBuilder;
4use crate::byte_record::{ByteRecord, Position};
5use crate::error::Result;
6use crate::string_record::StringRecord;
7use super::{
8    AsyncReaderImpl,
9    StringRecordsStream, StringRecordsIntoStream,
10    ByteRecordsStream, ByteRecordsIntoStream,
11};
12
13impl AsyncReaderBuilder {
14    /// Build a CSV reader from this configuration that reads data from `rdr`.
15    ///
16    /// Note that the CSV reader is buffered automatically, so you should not
17    /// wrap `rdr` in a buffered reader.
18    ///
19    /// # Example
20    ///
21    /// ```
22    /// use std::error::Error;
23    /// use csv_async::AsyncReaderBuilder;
24    /// use tokio_stream::StreamExt;
25    ///
26    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
27    /// async fn example() -> Result<(), Box<dyn Error>> {
28    ///     let data = "\
29    /// city,country,pop
30    /// Boston,United States,4628910
31    /// Concord,United States,42695
32    /// ";
33    ///     let mut rdr = AsyncReaderBuilder::new().create_reader(data.as_bytes());
34    ///     let mut records = rdr.into_records();
35    ///     while let Some(record) = records.next().await {
36    ///         println!("{:?}", record?);
37    ///     }
38    ///     Ok(())
39    /// }
40    /// ```
41    pub fn create_reader<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncReader<R> {
42        AsyncReader::new(self, rdr)
43    }
44}
45
46/// A already configured CSV reader for `tokio` runtime.
47///
48/// A CSV reader takes as input CSV data and transforms that into standard Rust
49/// values. The reader reads CSV data is as a sequence of records,
50/// where a record is a sequence of fields and each field is a string.
51///
52/// # Configuration
53///
54/// A CSV reader has convenient constructor method `create_reader`.
55/// However, if you want to configure the CSV reader to use
56/// a different delimiter or quote character (among many other things), then
57/// you should use a [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html) to construct
58/// a `AsyncReader`. For example, to change the field delimiter:
59///
60/// ```
61/// use std::error::Error;
62/// use csv_async::AsyncReaderBuilder;
63/// use tokio_stream::StreamExt;
64///
65/// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
66/// async fn example() -> Result<(), Box<dyn Error>> {
67///     let data = "\
68/// city;country;pop
69/// Boston;United States;4628910
70/// ";
71///     let mut rdr = AsyncReaderBuilder::new()
72///         .delimiter(b';')
73///         .create_reader(data.as_bytes());
74///
75///     let mut records = rdr.records();
76///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
77///     Ok(())
78/// }
79/// ```
80///
81/// # Error handling
82///
83/// In general, CSV *parsing* does not ever return an error. That is, there is
84/// no such thing as malformed CSV data. Instead, this reader will prioritize
85/// finding a parse over rejecting CSV data that it does not understand. This
86/// choice was inspired by other popular CSV parsers, but also because it is
87/// pragmatic. CSV data varies wildly, so even if the CSV data is malformed,
88/// it might still be possible to work with the data. In the land of CSV, there
89/// is no "right" or "wrong," only "right" and "less right."
90///
91/// With that said, a number of errors can occur while reading CSV data:
92///
93/// * By default, all records in CSV data must have the same number of fields.
94///   If a record is found with a different number of fields than a prior
95///   record, then an error is returned. This behavior can be disabled by
96///   enabling flexible parsing via the `flexible` method on
97///   [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html).
98/// * When reading CSV data from a resource (like a file), it is possible for
99///   reading from the underlying resource to fail. This will return an error.
100///   For subsequent calls to the reader after encountering a such error
101///   (unless `seek` is used), it will behave as if end of file had been
102///   reached, in order to avoid running into infinite loops when still
103///   attempting to read the next record when one has errored.
104/// * When reading CSV data into `String` or `&str` fields (e.g., via a
105///   [`StringRecord`](struct.StringRecord.html)), UTF-8 is strictly
106///   enforced. If CSV data is invalid UTF-8, then an error is returned. If
107///   you want to read invalid UTF-8, then you should use the byte oriented
108///   APIs such as [`ByteRecord`](struct.ByteRecord.html). If you need explicit
109///   support for another encoding entirely, then you'll need to use another
110///   crate to transcode your CSV data to UTF-8 before parsing it.
111/// * When using Serde to deserialize CSV data into Rust types, it is possible
112///   for a number of additional errors to occur. For example, deserializing
113///   a field `xyz` into an `i32` field will result in an error.
114///
115/// For more details on the precise semantics of errors, see the
116/// [`Error`](enum.Error.html) type.
117#[derive(Debug)]
118pub struct AsyncReader<R>(AsyncReaderImpl<R>);
119
120impl<'r, R> AsyncReader<R>
121where
122    R: io::AsyncRead + Unpin + Send + 'r,
123{
124    /// Create a new CSV reader given a builder and a source of underlying
125    /// bytes.
126    fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncReader<R> {
127        AsyncReader(AsyncReaderImpl::new(builder, rdr))
128    }
129
130    /// Create a new CSV parser with a default configuration for the given
131    /// reader.
132    ///
133    /// To customize CSV parsing, use a `ReaderBuilder`.
134    ///
135    /// # Example
136    ///
137    /// ```
138    /// use std::error::Error;
139    /// use csv_async::AsyncReader;
140    /// use tokio_stream::StreamExt;
141    ///
142    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
143    /// async fn example() -> Result<(), Box<dyn Error>> {
144    ///     let data = "\
145    /// city,country,pop
146    /// Boston,United States,4628910
147    /// Concord,United States,42695
148    /// ";
149    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
150    ///     let mut records = rdr.into_records();
151    ///     while let Some(record) = records.next().await {
152    ///         println!("{:?}", record?);
153    ///     }
154    ///     Ok(())
155    /// }
156    /// ```
157    #[inline]
158    pub fn from_reader(rdr: R) -> AsyncReader<R> {
159        AsyncReaderBuilder::new().create_reader(rdr)
160    }
161
162    /// Returns a borrowed iterator over all records as strings.
163    ///
164    /// Each item yielded by this iterator is a `Result<StringRecord, Error>`.
165    /// Therefore, in order to access the record, callers must handle the
166    /// possibility of error (typically with `try!` or `?`).
167    ///
168    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
169    /// default), then this does not include the first record.
170    ///
171    /// # Example
172    ///
173    /// ```
174    /// use std::error::Error;
175    /// use csv_async::AsyncReader;
176    /// use tokio_stream::StreamExt;
177    ///
178    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
179    /// async fn example() -> Result<(), Box<dyn Error>> {
180    ///     let data = "\
181    /// city,country,pop
182    /// Boston,United States,4628910
183    /// ";
184    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
185    ///     let mut records = rdr.records();
186    ///     while let Some(record) = records.next().await {
187    ///         println!("{:?}", record?);
188    ///     }
189    ///     Ok(())
190    /// }
191    /// ```
192    #[inline]
193    pub fn records(&mut self) -> StringRecordsStream<R> {
194        StringRecordsStream::new(&mut self.0)
195    }
196
197    /// Returns an owned iterator over all records as strings.
198    ///
199    /// Each item yielded by this iterator is a `Result<StringRecord, Error>`.
200    /// Therefore, in order to access the record, callers must handle the
201    /// possibility of error (typically with `try!` or `?`).
202    ///
203    /// This is mostly useful when you want to return a CSV iterator or store
204    /// it somewhere.
205    ///
206    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
207    /// default), then this does not include the first record.
208    ///
209    /// # Example
210    ///
211    /// ```
212    /// use std::error::Error;
213    /// use csv_async::AsyncReader;
214    /// use tokio_stream::StreamExt;
215    ///
216    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
217    /// async fn example() -> Result<(), Box<dyn Error>> {
218    ///     let data = "\
219    /// city,country,pop
220    /// Boston,United States,4628910
221    /// ";
222    ///     let rdr = AsyncReader::from_reader(data.as_bytes());
223    ///     let mut records = rdr.into_records();
224    ///     while let Some(record) = records.next().await {
225    ///         println!("{:?}", record?);
226    ///     }
227    ///     Ok(())
228    /// }
229    /// ```
230    #[inline]
231    pub fn into_records(self) -> StringRecordsIntoStream<'r, R> {
232        StringRecordsIntoStream::new(self.0)
233    }
234
235    /// Returns a borrowed iterator over all records as raw bytes.
236    ///
237    /// Each item yielded by this iterator is a `Result<ByteRecord, Error>`.
238    /// Therefore, in order to access the record, callers must handle the
239    /// possibility of error (typically with `try!` or `?`).
240    ///
241    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
242    /// default), then this does not include the first record.
243    ///
244    /// # Example
245    ///
246    /// ```
247    /// use std::error::Error;
248    /// use csv_async::AsyncReader;
249    /// use tokio_stream::StreamExt;
250    ///
251    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
252    /// async fn example() -> Result<(), Box<dyn Error>> {
253    ///     let data = "\
254    /// city,country,pop
255    /// Boston,United States,4628910
256    /// ";
257    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
258    ///     let mut iter = rdr.byte_records();
259    ///     assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
260    ///     assert!(iter.next().await.is_none());
261    ///     Ok(())
262    /// }
263    /// ```
264    #[inline]
265    pub fn byte_records(&mut self) -> ByteRecordsStream<R> {
266        ByteRecordsStream::new(&mut self.0)
267    }
268
269    /// Returns an owned iterator over all records as raw bytes.
270    ///
271    /// Each item yielded by this iterator is a `Result<ByteRecord, Error>`.
272    /// Therefore, in order to access the record, callers must handle the
273    /// possibility of error (typically with `try!` or `?`).
274    ///
275    /// This is mostly useful when you want to return a CSV iterator or store
276    /// it somewhere.
277    ///
278    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
279    /// default), then this does not include the first record.
280    ///
281    /// # Example
282    ///
283    /// ```
284    /// use std::error::Error;
285    /// use csv_async::AsyncReader;
286    /// use tokio_stream::StreamExt;
287    ///
288    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
289    /// async fn example() -> Result<(), Box<dyn Error>> {
290    ///     let data = "\
291    /// city,country,pop
292    /// Boston,United States,4628910
293    /// ";
294    ///     let rdr = AsyncReader::from_reader(data.as_bytes());
295    ///     let mut iter = rdr.into_byte_records();
296    ///     assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
297    ///     assert!(iter.next().await.is_none());
298    ///     Ok(())
299    /// }
300    /// ```
301    #[inline]
302    pub fn into_byte_records(self) -> ByteRecordsIntoStream<'r, R> {
303        ByteRecordsIntoStream::new(self.0)
304    }
305
306    /// Returns a reference to the first row read by this parser.
307    ///
308    /// If no row has been read yet, then this will force parsing of the first
309    /// row.
310    ///
311    /// If there was a problem parsing the row or if it wasn't valid UTF-8,
312    /// then this returns an error.
313    ///
314    /// If the underlying reader emits EOF before any data, then this returns
315    /// an empty record.
316    ///
317    /// Note that this method may be used regardless of whether `has_headers`
318    /// was enabled (but it is enabled by default).
319    ///
320    /// # Example
321    ///
322    /// This example shows how to get the header row of CSV data. Notice that
323    /// the header row does not appear as a record in the iterator!
324    ///
325    /// ```
326    /// use std::error::Error;
327    /// use csv_async::AsyncReader;
328    /// use tokio_stream::StreamExt;
329    ///
330    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
331    /// async fn example() -> Result<(), Box<dyn Error>> {
332    ///     let data = "\
333    /// city,country,pop
334    /// Boston,United States,4628910
335    /// ";
336    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
337    ///
338    ///     // We can read the headers before iterating.
339    ///     {
340    ///     // `headers` borrows from the reader, so we put this in its
341    ///     // own scope. That way, the borrow ends before we try iterating
342    ///     // below. Alternatively, we could clone the headers.
343    ///     let headers = rdr.headers().await?;
344    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
345    ///     }
346    ///
347    ///     {
348    ///     let mut records = rdr.records();
349    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
350    ///     assert!(records.next().await.is_none());
351    ///     }
352    ///
353    ///     // We can also read the headers after iterating.
354    ///     let headers = rdr.headers().await?;
355    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
356    ///     Ok(())
357    /// }
358    /// ```
359    #[inline]
360    pub async fn headers(&mut self) -> Result<&StringRecord> {
361        self.0.headers().await
362    }
363
364    /// Returns a reference to the first row read by this parser as raw bytes.
365    ///
366    /// If no row has been read yet, then this will force parsing of the first
367    /// row.
368    ///
369    /// If there was a problem parsing the row then this returns an error.
370    ///
371    /// If the underlying reader emits EOF before any data, then this returns
372    /// an empty record.
373    ///
374    /// Note that this method may be used regardless of whether `has_headers`
375    /// was enabled (but it is enabled by default).
376    ///
377    /// # Example
378    ///
379    /// This example shows how to get the header row of CSV data. Notice that
380    /// the header row does not appear as a record in the iterator!
381    ///
382    /// ```
383    /// use std::error::Error;
384    /// use csv_async::AsyncReader;
385    /// use tokio_stream::StreamExt;
386    ///
387    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
388    /// async fn example() -> Result<(), Box<dyn Error>> {
389    ///     let data = "\
390    /// city,country,pop
391    /// Boston,United States,4628910
392    /// ";
393    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
394    ///
395    ///     // We can read the headers before iterating.
396    ///     {
397    ///     // `headers` borrows from the reader, so we put this in its
398    ///     // own scope. That way, the borrow ends before we try iterating
399    ///     // below. Alternatively, we could clone the headers.
400    ///     let headers = rdr.byte_headers().await?;
401    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
402    ///     }
403    ///
404    ///     {
405    ///     let mut records = rdr.byte_records();
406    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
407    ///     assert!(records.next().await.is_none());
408    ///     }
409    ///
410    ///     // We can also read the headers after iterating.
411    ///     let headers = rdr.byte_headers().await?;
412    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
413    ///     Ok(())
414    /// }
415    /// ```
416    #[inline]
417    pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
418        self.0.byte_headers().await
419    }
420
421    /// Set the headers of this CSV parser manually.
422    ///
423    /// This overrides any other setting (including `set_byte_headers`). Any
424    /// automatic detection of headers is disabled. This may be called at any
425    /// time.
426    ///
427    /// # Example
428    ///
429    /// ```
430    /// use std::error::Error;
431    /// use csv_async::{AsyncReader, StringRecord};
432    ///
433    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
434    /// async fn example() -> Result<(), Box<dyn Error>> {
435    ///     let data = "\
436    /// city,country,pop
437    /// Boston,United States,4628910
438    /// ";
439    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
440    ///
441    ///     assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
442    ///     rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
443    ///     assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
444    ///
445    ///     Ok(())
446    /// }
447    /// ```
448    #[inline]
449    pub fn set_headers(&mut self, headers: StringRecord) {
450        self.0.set_headers(headers);
451    }
452
453    /// Set the headers of this CSV parser manually as raw bytes.
454    ///
455    /// This overrides any other setting (including `set_headers`). Any
456    /// automatic detection of headers is disabled. This may be called at any
457    /// time.
458    ///
459    /// # Example
460    ///
461    /// ```
462    /// use std::error::Error;
463    /// use csv_async::{AsyncReader, ByteRecord};
464    ///
465    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
466    /// async fn example() -> Result<(), Box<dyn Error>> {
467    ///     let data = "\
468    /// city,country,pop
469    /// Boston,United States,4628910
470    /// ";
471    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
472    ///
473    ///     assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
474    ///     rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
475    ///     assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
476    ///
477    ///     Ok(())
478    /// }
479    /// ```
480    #[inline]
481    pub fn set_byte_headers(&mut self, headers: ByteRecord) {
482        self.0.set_byte_headers(headers);
483    }
484
485    /// Read a single row into the given record. Returns false when no more
486    /// records could be read.
487    ///
488    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
489    /// default), then this will treat initial row as headers and read the first data record.
490    ///
491    /// This method is useful when you want to read records as fast as
492    /// as possible. It's less ergonomic than an iterator, but it permits the
493    /// caller to reuse the `StringRecord` allocation, which usually results
494    /// in higher throughput.
495    ///
496    /// Records read via this method are guaranteed to have a position set
497    /// on them, even if the reader is at EOF or if an error is returned.
498    ///
499    /// # Example
500    ///
501    /// ```
502    /// use std::error::Error;
503    /// use csv_async::{AsyncReader, StringRecord};
504    ///
505    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
506    /// async fn example() -> Result<(), Box<dyn Error>> {
507    ///     let data = "\
508    /// city,country,pop
509    /// Boston,United States,4628910
510    /// ";
511    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
512    ///     let mut record = StringRecord::new();
513    ///
514    ///     if rdr.read_record(&mut record).await? {
515    ///         assert_eq!(record, vec!["Boston", "United States", "4628910"]);
516    ///         Ok(())
517    ///     } else {
518    ///         Err(From::from("expected at least one record but got none"))
519    ///     }
520    /// }
521    /// ```
522    #[inline]
523    pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
524        self.0.read_record(record).await
525    }
526
527    /// Read a single row into the given byte record. Returns false when no
528    /// more records could be read.
529    ///
530    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
531    /// default), then this will treat initial row as headers and read the first data record.
532    ///
533    /// This method is useful when you want to read records as fast as
534    /// as possible. It's less ergonomic than an iterator, but it permits the
535    /// caller to reuse the `ByteRecord` allocation, which usually results
536    /// in higher throughput.
537    ///
538    /// Records read via this method are guaranteed to have a position set
539    /// on them, even if the reader is at EOF or if an error is returned.
540    ///
541    /// # Example
542    ///
543    /// ```
544    /// use std::error::Error;
545    /// use csv_async::{ByteRecord, AsyncReader};
546    ///
547    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
548    /// async fn example() -> Result<(), Box<dyn Error>> {
549    ///     let data = "\
550    /// city,country,pop
551    /// Boston,United States,4628910
552    /// ";
553    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
554    ///     let mut record = ByteRecord::new();
555    ///
556    ///     if rdr.read_byte_record(&mut record).await? {
557    ///         assert_eq!(record, vec!["Boston", "United States", "4628910"]);
558    ///         Ok(())
559    ///     } else {
560    ///         Err(From::from("expected at least one record but got none"))
561    ///     }
562    /// }
563    /// ```
564    #[inline]
565    pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
566        self.0.read_byte_record(record).await
567    }
568
569    /// Return the current position of this CSV reader.
570    ///
571    /// The byte offset in the position returned can be used to `seek` this
572    /// reader. In particular, seeking to a position returned here on the same
573    /// data will result in parsing the same subsequent record.
574    ///
575    /// # Example: reading the position
576    ///
577    /// ```
578    /// use std::error::Error;
579    /// use std::io;
580    /// use csv_async::{AsyncReader, Position};
581    /// use tokio_stream::StreamExt;
582    ///
583    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
584    /// async fn example() -> Result<(), Box<dyn Error>> {
585    ///     let data = "\
586    /// city,country,popcount
587    /// Boston,United States,4628910
588    /// Concord,United States,42695
589    /// ";
590    ///     let rdr = AsyncReader::from_reader(io::Cursor::new(data));
591    ///     let mut iter = rdr.into_records();
592    ///     let mut pos = Position::new();
593    ///     loop {
594    ///         let next = iter.next().await;
595    ///         if let Some(next) = next {
596    ///             pos = next?.position().expect("Cursor should be at some valid position").clone();
597    ///         } else {
598    ///             break;
599    ///         }
600    ///     }
601    ///
602    ///     // `pos` should now be the position immediately before the last
603    ///     // record.
604    ///     assert_eq!(pos.byte(), 51);
605    ///     assert_eq!(pos.line(), 3);
606    ///     assert_eq!(pos.record(), 2);
607    ///     Ok(())
608    /// }
609    /// ```
610    #[inline]
611    pub fn position(&self) -> &Position {
612        self.0.position()
613    }
614
615    /// Returns true if and only if this reader has been exhausted.
616    ///
617    /// When this returns true, no more records can be read from this reader.
618    ///
619    /// # Example
620    ///
621    /// ```
622    /// # use tokio1 as tokio;
623    /// use std::error::Error;
624    /// use tokio::io;
625    /// use tokio_stream::StreamExt;
626    /// use csv_async::{AsyncReader, Position};
627    ///
628    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
629    /// async fn example() -> Result<(), Box<dyn Error>> {
630    ///     let data = "\
631    /// city,country,popcount
632    /// Boston,United States,4628910
633    /// Concord,United States,42695
634    /// ";
635    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
636    ///     assert!(!rdr.is_done());
637    ///     {
638    ///         let mut records = rdr.records();
639    ///         while let Some(record) = records.next().await {
640    ///             let _ = record?;
641    ///         }
642    ///     }
643    ///     assert!(rdr.is_done());
644    ///     Ok(())
645    /// }
646    /// ```
647    #[inline]
648    pub fn is_done(&self) -> bool {
649        self.0.is_done()
650    }
651
652    /// Returns true if and only if this reader has been configured to
653    /// interpret the first record as a header record.
654    #[inline]
655    pub fn has_headers(&self) -> bool {
656        self.0.has_headers()
657    }
658
659    /// Returns a reference to the underlying reader.
660    #[inline]
661    pub fn get_ref(&self) -> &R {
662        self.0.get_ref()
663    }
664
665    /// Returns a mutable reference to the underlying reader.
666    #[inline]
667    pub fn get_mut(&mut self) -> &mut R {
668        self.0.get_mut()
669    }
670
671    /// Unwraps this CSV reader, returning the underlying reader.
672    ///
673    /// Note that any leftover data inside this reader's internal buffer is
674    /// lost.
675    #[inline]
676    pub fn into_inner(self) -> R {
677        self.0.into_inner()
678    }
679}
680
681impl<R: io::AsyncRead + io::AsyncSeek + std::marker::Unpin> AsyncReader<R> {
682    /// Seeks the underlying reader to the position given.
683    ///
684    /// This comes with a few caveats:
685    ///
686    /// * Any internal buffer associated with this reader is cleared.
687    /// * If the given position does not correspond to a position immediately
688    ///   before the start of a record, then the behavior of this reader is
689    ///   unspecified.
690    /// * Any special logic that skips the first record in the CSV reader
691    ///   when reading or iterating over records is disabled.
692    ///
693    /// If the given position has a byte offset equivalent to the current
694    /// position, then no seeking is performed.
695    ///
696    /// If the header row has not already been read, then this will attempt
697    /// to read the header row before seeking. Therefore, it is possible that
698    /// this returns an error associated with reading CSV data.
699    ///
700    /// Note that seeking is performed based only on the byte offset in the
701    /// given position. Namely, the record or line numbers in the position may
702    /// be incorrect, but this will cause any future position generated by
703    /// this CSV reader to be similarly incorrect.
704    ///
705    /// # Example: seek to parse a record twice
706    ///
707    /// ```
708    /// # use tokio1 as tokio;
709    /// use std::error::Error;
710    /// use tokio::io;
711    /// use tokio_stream::StreamExt;
712    /// use csv_async::{AsyncReader, Position};
713    ///
714    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
715    /// async fn example() -> Result<(), Box<dyn Error>> {
716    ///     let data = "\
717    /// city,country,popcount
718    /// Boston,United States,4628910
719    /// Concord,United States,42695
720    /// ";
721    ///     let mut rdr = AsyncReader::from_reader(std::io::Cursor::new(data));
722    ///     let mut pos = Position::new();
723    ///     {
724    ///     let mut records = rdr.records();
725    ///     loop {
726    ///         let next = records.next().await;
727    ///         if let Some(next) = next {
728    ///             pos = next?.position().expect("Cursor should be at some valid position").clone();
729    ///         } else {
730    ///             break;
731    ///         }
732    ///     }
733    ///     }
734    ///
735    ///     {
736    ///     // Now seek the reader back to `pos`. This will let us read the
737    ///     // last record again.
738    ///     rdr.seek(pos).await?;
739    ///     let mut records = rdr.into_records();
740    ///     if let Some(result) = records.next().await {
741    ///         let record = result?;
742    ///         assert_eq!(record, vec!["Concord", "United States", "42695"]);
743    ///         Ok(())
744    ///     } else {
745    ///         Err(From::from("expected at least one record but got none"))
746    ///     }
747    ///     }
748    /// }
749    /// ```
750    #[inline]
751    pub async fn seek(&mut self, pos: Position) -> Result<()> {
752        self.0.seek(pos).await
753    }
754
755    /// This is like `seek`, but provides direct control over how the seeking
756    /// operation is performed via `io::SeekFrom`.
757    ///
758    /// The `pos` position given *should* correspond the position indicated
759    /// by `seek_from`, but there is no requirement. If the `pos` position
760    /// given is incorrect, then the position information returned by this
761    /// reader will be similarly incorrect.
762    ///
763    /// If the header row has not already been read, then this will attempt
764    /// to read the header row before seeking. Therefore, it is possible that
765    /// this returns an error associated with reading CSV data.
766    ///
767    /// Unlike `seek`, this will always cause an actual seek to be performed.
768    #[inline]
769    pub async fn seek_raw(
770        &mut self,
771        seek_from: io::SeekFrom,
772        pos: Position,
773    ) -> Result<()> {
774        self.0.seek_raw(seek_from, pos).await
775    }
776
777    /// Rewinds the underlying reader to first data record.
778    ///
779    /// Function is aware of header presence.
780    /// After `rewind` record iterators will return first data record (skipping header if present), while
781    /// after `seek(0)` they will return header row (even if `has_header` is set).
782    /// 
783    /// # Example: Reads the same data multiply times
784    ///
785    /// ```
786    /// # use tokio1 as tokio;
787    /// use std::error::Error;
788    /// use tokio::io;
789    /// use tokio_stream::StreamExt;
790    /// use csv_async::AsyncReader;
791    ///
792    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
793    /// async fn example() -> Result<(), Box<dyn Error>> {
794    ///     let data = "\
795    /// city,country,popcount
796    /// Boston,United States,4628910
797    /// Concord,United States,42695
798    /// ";
799    ///     let mut rdr = AsyncReader::from_reader(std::io::Cursor::new(data));
800    ///     let mut output = Vec::new();
801    ///     loop {
802    ///         let mut records = rdr.records();
803    ///         while let Some(rec) = records.next().await {
804    ///             output.push(rec?);
805    ///         }
806    ///         if output.len() >= 6 {
807    ///             break;
808    ///         } else {
809    ///             drop(records);
810    ///             rdr.rewind().await?;
811    ///         }
812    ///     }
813    ///     assert_eq!(output,
814    ///         vec![
815    ///             vec!["Boston", "United States", "4628910"],
816    ///             vec!["Concord", "United States", "42695"],
817    ///             vec!["Boston", "United States", "4628910"],
818    ///             vec!["Concord", "United States", "42695"],
819    ///             vec!["Boston", "United States", "4628910"],
820    ///             vec!["Concord", "United States", "42695"],
821    ///         ]);
822    ///     Ok(())
823    /// }
824    /// ```
825    #[inline]
826    pub async fn rewind(&mut self) -> Result<()> {
827        self.0.rewind().await
828    }
829}
830
831#[cfg(test)]
832mod tests {
833    use std::pin::Pin;
834    use std::task::{Context, Poll};
835
836    use tokio::io;
837    use tokio_stream::StreamExt;
838    use tokio::runtime::Runtime;
839
840    use crate::byte_record::ByteRecord;
841    use crate::error::ErrorKind;
842    use crate::string_record::StringRecord;
843    use crate::Trim;
844
845    use super::{Position, AsyncReaderBuilder, AsyncReader};
846
847    fn b(s: &str) -> &[u8] {
848        s.as_bytes()
849    }
850    fn s(b: &[u8]) -> &str {
851        ::std::str::from_utf8(b).unwrap()
852    }
853
854    fn newpos(byte: u64, line: u64, record: u64) -> Position {
855        let mut p = Position::new();
856        p.set_byte(byte).set_line(line).set_record(record);
857        p
858    }
859
860    async fn count(stream: impl StreamExt) -> usize {
861        stream.fold(0, |acc, _| acc + 1 ).await
862    }
863
864    #[tokio::test]
865    async fn read_byte_record() {
866        let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
867        let mut rdr =
868            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
869        let mut rec = ByteRecord::new();
870
871        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
872        assert_eq!(3, rec.len());
873        assert_eq!("foo", s(&rec[0]));
874        assert_eq!("b,ar", s(&rec[1]));
875        assert_eq!("baz", s(&rec[2]));
876
877        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
878        assert_eq!(3, rec.len());
879        assert_eq!("abc", s(&rec[0]));
880        assert_eq!("mno", s(&rec[1]));
881        assert_eq!("xyz", s(&rec[2]));
882
883        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
884    }
885
886    #[tokio::test]
887    async fn read_trimmed_records_and_headers() {
888        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
889        let mut rdr = AsyncReaderBuilder::new()
890            .has_headers(true)
891            .trim(Trim::All)
892            .create_reader(data);
893        let mut rec = ByteRecord::new();
894        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
895        assert_eq!("1", s(&rec[0]));
896        assert_eq!("2", s(&rec[1]));
897        assert_eq!("3", s(&rec[2]));
898        let mut rec = StringRecord::new();
899        assert!(rdr.read_record(&mut rec).await.unwrap());
900        assert_eq!("1", &rec[0]);
901        assert_eq!("", &rec[1]);
902        assert_eq!("3", &rec[2]);
903        {
904            let headers = rdr.headers().await.unwrap();
905            assert_eq!(3, headers.len());
906            assert_eq!("foo", &headers[0]);
907            assert_eq!("bar", &headers[1]);
908            assert_eq!("baz", &headers[2]);
909        }
910    }
911
912    #[tokio::test]
913    async fn read_trimmed_header() {
914        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
915        let mut rdr = AsyncReaderBuilder::new()
916            .has_headers(true)
917            .trim(Trim::Headers)
918            .create_reader(data);
919        let mut rec = ByteRecord::new();
920        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
921        assert_eq!("  1", s(&rec[0]));
922        assert_eq!("  2", s(&rec[1]));
923        assert_eq!("  3", s(&rec[2]));
924        {
925            let headers = rdr.headers().await.unwrap();
926            assert_eq!(3, headers.len());
927            assert_eq!("foo", &headers[0]);
928            assert_eq!("bar", &headers[1]);
929            assert_eq!("baz", &headers[2]);
930        }
931    }
932
933    #[tokio::test]
934    async fn read_trimed_header_invalid_utf8() {
935        let data = &b"foo,  b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
936        let mut rdr = AsyncReaderBuilder::new()
937            .has_headers(true)
938            .trim(Trim::Headers)
939            .create_reader(data);
940        let mut rec = StringRecord::new();
941
942        // force the headers to be read
943        let _ = rdr.read_record(&mut rec).await;
944        // Check the byte headers are trimmed
945        {
946            let headers = rdr.byte_headers().await.unwrap();
947            assert_eq!(3, headers.len());
948            assert_eq!(b"foo", &headers[0]);
949            assert_eq!(b"b\xFFar", &headers[1]);
950            assert_eq!(b"baz", &headers[2]);
951        }
952        match *rdr.headers().await.unwrap_err().kind() {
953            ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
954                assert_eq!(pos, &newpos(0, 1, 0));
955                assert_eq!(err.field(), 1);
956                assert_eq!(err.valid_up_to(), 3);
957            }
958            ref err => panic!("match failed, got {:?}", err),
959        }
960    }
961
962    #[tokio::test]
963    async fn read_trimmed_records() {
964        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
965        let mut rdr = AsyncReaderBuilder::new()
966            .has_headers(true)
967            .trim(Trim::Fields)
968            .create_reader(data);
969        let mut rec = ByteRecord::new();
970        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
971        assert_eq!("1", s(&rec[0]));
972        assert_eq!("2", s(&rec[1]));
973        assert_eq!("3", s(&rec[2]));
974        {
975            let headers = rdr.headers().await.unwrap();
976            assert_eq!(3, headers.len());
977            assert_eq!("foo", &headers[0]);
978            assert_eq!("  bar", &headers[1]);
979            assert_eq!("\tbaz", &headers[2]);
980        }
981    }
982
983    #[tokio::test]
984    async fn read_record_unequal_fails() {
985        let data = b("foo\nbar,baz");
986        let mut rdr =
987            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
988        let mut rec = ByteRecord::new();
989
990        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
991        assert_eq!(1, rec.len());
992        assert_eq!("foo", s(&rec[0]));
993
994        match rdr.read_byte_record(&mut rec).await {
995            Err(err) => match *err.kind() {
996                ErrorKind::UnequalLengths {
997                    expected_len: 1,
998                    ref pos,
999                    len: 2,
1000                } => {
1001                    assert_eq!(pos, &Some(newpos(4, 2, 1)));
1002                }
1003                ref wrong => panic!("match failed, got {:?}", wrong),
1004            },
1005            wrong => panic!("match failed, got {:?}", wrong),
1006        }
1007    }
1008
1009    #[tokio::test]
1010    async fn read_record_unequal_ok() {
1011        let data = b("foo\nbar,baz");
1012        let mut rdr = AsyncReaderBuilder::new()
1013            .has_headers(false)
1014            .flexible(true)
1015            .create_reader(data);
1016        let mut rec = ByteRecord::new();
1017
1018        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1019        assert_eq!(1, rec.len());
1020        assert_eq!("foo", s(&rec[0]));
1021
1022        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1023        assert_eq!(2, rec.len());
1024        assert_eq!("bar", s(&rec[0]));
1025        assert_eq!("baz", s(&rec[1]));
1026
1027        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1028    }
1029
1030    // This tests that even if we get a CSV error, we can continue reading
1031    // if we want.
1032    #[tokio::test]
1033    async fn read_record_unequal_continue() {
1034        let data = b("foo\nbar,baz\nquux");
1035        let mut rdr =
1036            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1037        let mut rec = ByteRecord::new();
1038
1039        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1040        assert_eq!(1, rec.len());
1041        assert_eq!("foo", s(&rec[0]));
1042
1043        match rdr.read_byte_record(&mut rec).await {
1044            Err(err) => match err.kind() {
1045                &ErrorKind::UnequalLengths {
1046                    expected_len: 1,
1047                    ref pos,
1048                    len: 2,
1049                } => {
1050                    assert_eq!(pos, &Some(newpos(4, 2, 1)));
1051                }
1052                wrong => panic!("match failed, got {:?}", wrong),
1053            },
1054            wrong => panic!("match failed, got {:?}", wrong),
1055        }
1056
1057        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1058        assert_eq!(1, rec.len());
1059        assert_eq!("quux", s(&rec[0]));
1060
1061        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1062    }
1063
1064    #[tokio::test]
1065    async fn read_record_headers() {
1066        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1067        let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
1068        let mut rec = StringRecord::new();
1069
1070        assert!(rdr.read_record(&mut rec).await.unwrap());
1071        assert_eq!(3, rec.len());
1072        assert_eq!("a", &rec[0]);
1073
1074        assert!(rdr.read_record(&mut rec).await.unwrap());
1075        assert_eq!(3, rec.len());
1076        assert_eq!("d", &rec[0]);
1077
1078        assert!(!rdr.read_record(&mut rec).await.unwrap());
1079
1080        {
1081            let headers = rdr.byte_headers().await.unwrap();
1082            assert_eq!(3, headers.len());
1083            assert_eq!(b"foo", &headers[0]);
1084            assert_eq!(b"bar", &headers[1]);
1085            assert_eq!(b"baz", &headers[2]);
1086        }
1087        {
1088            let headers = rdr.headers().await.unwrap();
1089            assert_eq!(3, headers.len());
1090            assert_eq!("foo", &headers[0]);
1091            assert_eq!("bar", &headers[1]);
1092            assert_eq!("baz", &headers[2]);
1093        }
1094    }
1095
1096    #[tokio::test]
1097    async fn read_record_headers_invalid_utf8() {
1098        let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
1099        let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
1100        let mut rec = StringRecord::new();
1101
1102        assert!(rdr.read_record(&mut rec).await.unwrap());
1103        assert_eq!(3, rec.len());
1104        assert_eq!("a", &rec[0]);
1105
1106        assert!(rdr.read_record(&mut rec).await.unwrap());
1107        assert_eq!(3, rec.len());
1108        assert_eq!("d", &rec[0]);
1109
1110        assert!(!rdr.read_record(&mut rec).await.unwrap());
1111
1112        // Check that we can read the headers as raw bytes, but that
1113        // if we read them as strings, we get an appropriate UTF-8 error.
1114        {
1115            let headers = rdr.byte_headers().await.unwrap();
1116            assert_eq!(3, headers.len());
1117            assert_eq!(b"foo", &headers[0]);
1118            assert_eq!(b"b\xFFar", &headers[1]);
1119            assert_eq!(b"baz", &headers[2]);
1120        }
1121        match *rdr.headers().await.unwrap_err().kind() {
1122            ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1123                assert_eq!(pos, &newpos(0, 1, 0));
1124                assert_eq!(err.field(), 1);
1125                assert_eq!(err.valid_up_to(), 1);
1126            }
1127            ref err => panic!("match failed, got {:?}", err),
1128        }
1129    }
1130
1131    #[tokio::test]
1132    async fn read_record_no_headers_before() {
1133        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1134        let mut rdr =
1135            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1136        let mut rec = StringRecord::new();
1137
1138        {
1139            let headers = rdr.headers().await.unwrap();
1140            assert_eq!(3, headers.len());
1141            assert_eq!("foo", &headers[0]);
1142            assert_eq!("bar", &headers[1]);
1143            assert_eq!("baz", &headers[2]);
1144        }
1145
1146        assert!(rdr.read_record(&mut rec).await.unwrap());
1147        assert_eq!(3, rec.len());
1148        assert_eq!("foo", &rec[0]);
1149
1150        assert!(rdr.read_record(&mut rec).await.unwrap());
1151        assert_eq!(3, rec.len());
1152        assert_eq!("a", &rec[0]);
1153
1154        assert!(rdr.read_record(&mut rec).await.unwrap());
1155        assert_eq!(3, rec.len());
1156        assert_eq!("d", &rec[0]);
1157
1158        assert!(!rdr.read_record(&mut rec).await.unwrap());
1159    }
1160
1161    #[tokio::test]
1162    async fn read_record_no_headers_after() {
1163        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1164        let mut rdr =
1165            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1166        let mut rec = StringRecord::new();
1167
1168        assert!(rdr.read_record(&mut rec).await.unwrap());
1169        assert_eq!(3, rec.len());
1170        assert_eq!("foo", &rec[0]);
1171
1172        assert!(rdr.read_record(&mut rec).await.unwrap());
1173        assert_eq!(3, rec.len());
1174        assert_eq!("a", &rec[0]);
1175
1176        assert!(rdr.read_record(&mut rec).await.unwrap());
1177        assert_eq!(3, rec.len());
1178        assert_eq!("d", &rec[0]);
1179
1180        assert!(!rdr.read_record(&mut rec).await.unwrap());
1181
1182        let headers = rdr.headers().await.unwrap();
1183        assert_eq!(3, headers.len());
1184        assert_eq!("foo", &headers[0]);
1185        assert_eq!("bar", &headers[1]);
1186        assert_eq!("baz", &headers[2]);
1187    }
1188
1189    #[tokio::test]
1190    async fn seek() {
1191        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1192        let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1193        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1194
1195        let mut rec = StringRecord::new();
1196
1197        assert_eq!(18, rdr.position().byte());
1198        assert!(rdr.read_record(&mut rec).await.unwrap());
1199        assert_eq!(3, rec.len());
1200        assert_eq!("d", &rec[0]);
1201
1202        assert_eq!(24, rdr.position().byte());
1203        assert_eq!(4, rdr.position().line());
1204        assert_eq!(3, rdr.position().record());
1205        assert!(rdr.read_record(&mut rec).await.unwrap());
1206        assert_eq!(3, rec.len());
1207        assert_eq!("g", &rec[0]);
1208
1209        assert!(!rdr.read_record(&mut rec).await.unwrap());
1210    }
1211
1212    // Test that we can read headers after seeking even if the headers weren't
1213    // explicit read before seeking.
1214    #[tokio::test]
1215    async fn seek_headers_after() {
1216        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1217        let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1218        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1219        assert_eq!(rdr.headers().await.unwrap(), vec!["foo", "bar", "baz"]);
1220}
1221
1222    // Test that we can read headers after seeking if the headers were read
1223    // before seeking.
1224    #[tokio::test]
1225    async fn seek_headers_before_after() {
1226        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1227        let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1228        let headers = rdr.headers().await.unwrap().clone();
1229        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1230        assert_eq!(&headers, rdr.headers().await.unwrap());
1231    }
1232
1233    // Test that even if we didn't read headers before seeking, if we seek to
1234    // the current byte offset, then no seeking is done and therefore we can
1235    // still read headers after seeking.
1236    #[tokio::test]
1237    async fn seek_headers_no_actual_seek() {
1238        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1239        let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1240        rdr.seek(Position::new()).await.unwrap();
1241        assert_eq!("foo", &rdr.headers().await.unwrap()[0]);
1242    }
1243
1244    #[tokio::test]
1245    async fn rewind() {
1246        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1247        let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1248
1249        let mut rec = StringRecord::new();
1250        assert!(rdr.read_record(&mut rec).await.unwrap());
1251        assert_eq!(3, rec.len());
1252        assert_eq!("a", &rec[0]);
1253
1254        // assert_eq!(18, rdr.position().byte());
1255        assert!(rdr.read_record(&mut rec).await.unwrap());
1256        assert_eq!(3, rec.len());
1257        assert_eq!("d", &rec[0]);
1258
1259        rdr.rewind().await.unwrap();
1260
1261        assert!(rdr.read_record(&mut rec).await.unwrap());
1262        assert_eq!(3, rec.len());
1263        assert_eq!("a", &rec[0]);
1264    }
1265
1266    // Test that position info is reported correctly in absence of headers.
1267    #[tokio::test]
1268    async fn positions_no_headers() {
1269        let mut rdr = AsyncReaderBuilder::new()
1270            .has_headers(false)
1271            .create_reader("a,b,c\nx,y,z".as_bytes())
1272            .into_records();
1273
1274        let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1275        assert_eq!(pos.byte(), 0);
1276        assert_eq!(pos.line(), 1);
1277        assert_eq!(pos.record(), 0);
1278
1279        let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1280        assert_eq!(pos.byte(), 6);
1281        assert_eq!(pos.line(), 2);
1282        assert_eq!(pos.record(), 1);
1283
1284        // Test that we are at end of stream, and properly signal this.
1285        assert!(rdr.next().await.is_none());
1286        // Testing that we are not panic, trying to pass over end of stream (Issue#22)
1287        assert!(rdr.next().await.is_none());
1288    }
1289
1290    // Test that position info is reported correctly with headers.
1291    #[tokio::test]
1292    async fn positions_headers() {
1293        let mut rdr = AsyncReaderBuilder::new()
1294            .has_headers(true)
1295            .create_reader("a,b,c\nx,y,z".as_bytes())
1296            .into_records();
1297
1298        let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1299        assert_eq!(pos.byte(), 6);
1300        assert_eq!(pos.line(), 2);
1301        assert_eq!(pos.record(), 1);
1302    }
1303
1304    // Test that reading headers on empty data yields an empty record.
1305    #[tokio::test]
1306    async fn headers_on_empty_data() {
1307        let mut rdr = AsyncReaderBuilder::new().create_reader("".as_bytes());
1308        let r = rdr.byte_headers().await.unwrap();
1309        assert_eq!(r.len(), 0);
1310    }
1311
1312    // Test that reading the first record on empty data works.
1313    #[tokio::test]
1314    async fn no_headers_on_empty_data() {
1315        let mut rdr =
1316        AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
1317        assert_eq!(count(rdr.records()).await, 0);
1318    }
1319
1320    // Test that reading the first record on empty data works, even if
1321    // we've tried to read headers before hand.
1322    #[tokio::test]
1323    async fn no_headers_on_empty_data_after_headers() {
1324        let mut rdr =
1325            AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
1326        assert_eq!(rdr.headers().await.unwrap().len(), 0);
1327        assert_eq!(count(rdr.records()).await, 0);
1328    }
1329
1330    #[test]
1331    fn no_infinite_loop_on_io_errors() {
1332        struct FailingRead;
1333        impl io::AsyncRead for FailingRead {
1334            fn poll_read(
1335                self: Pin<&mut Self>,
1336                _cx: &mut Context,
1337                _buf: &mut tokio::io::ReadBuf
1338            ) -> Poll<Result<(), io::Error>> {
1339                Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
1340            }
1341        }
1342        impl Unpin for FailingRead {}
1343    
1344        Runtime::new().unwrap().block_on(async {
1345            let mut record_results = AsyncReader::from_reader(FailingRead).into_records();
1346            let first_result = record_results.next().await;
1347            assert!(
1348                matches!(&first_result, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1349            );
1350            assert!(record_results.next().await.is_none());
1351        });
1352    }
1353}