csv_async/async_readers/
ades_tokio.rs

1use tokio::io;
2use serde::de::DeserializeOwned;
3
4use crate::AsyncReaderBuilder;
5use crate::byte_record::{ByteRecord, Position};
6use crate::error::Result;
7use crate::string_record::StringRecord;
8use super::{
9    AsyncReaderImpl,
10    DeserializeRecordsStream, DeserializeRecordsIntoStream,
11    DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
12};
13
14
15impl AsyncReaderBuilder {
16    /// Build a CSV `serde` deserializer from this configuration that reads data from `rdr`.
17    ///
18    /// Note that the CSV reader is buffered automatically, so you should not
19    /// wrap `rdr` in a buffered reader.
20    ///
21    /// # Example
22    ///
23    /// ```
24    /// # use tokio1 as tokio;
25    /// use std::error::Error;
26    /// use csv_async::AsyncReaderBuilder;
27    /// use serde::Deserialize;
28    /// use tokio_stream::StreamExt;
29    /// 
30    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
31    /// struct Row {
32    ///     city: String,
33    ///     country: String,
34    ///     pop: u64,
35    /// }
36    ///
37    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
38    /// async fn example() -> Result<(), Box<dyn Error>> {
39    ///     let data = "\
40    /// city,country,pop
41    /// Boston,United States,4628910
42    /// Concord,United States,42695
43    /// ";
44    ///     let mut rdr = AsyncReaderBuilder::new().create_deserializer(data.as_bytes());
45    ///     let mut records = rdr.into_deserialize::<Row>();
46    ///     while let Some(record) = records.next().await {
47    ///         println!("{:?}", record?);
48    ///     }
49    ///     Ok(())
50    /// }
51    /// ```
52    pub fn create_deserializer<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncDeserializer<R> {
53        AsyncDeserializer::new(self, rdr)
54    }
55}
56
57/// A already configured CSV `serde` deserializer for `tokio` runtime.
58///
59/// A CSV deserializer takes as input CSV data and transforms that into standard Rust
60/// values. The reader reads CSV data is as a sequence of records,
61/// where a record is either a sequence of string fields or structure with derived
62/// `serde::Deserialize` interface.
63///
64/// # Configuration
65///
66/// A CSV deserializer has convenient constructor method `from_reader`.
67/// However, if you want to configure the CSV deserializer to use
68/// a different delimiter or quote character (among many other things), then
69/// you should use a [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html) to construct
70/// a `AsyncDeserializer`. For example, to change the field delimiter:
71///
72/// ```
73/// # use tokio1 as tokio;
74/// use std::error::Error;
75/// use csv_async::AsyncReaderBuilder;
76/// use serde::Deserialize;
77/// use tokio_stream::StreamExt;
78/// 
79/// #[derive(Debug, Deserialize, Eq, PartialEq)]
80/// struct Row {
81///     city: String,
82///     country: String,
83///     pop: u64,
84/// }
85///
86/// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
87/// async fn example() -> Result<(), Box<dyn Error>> {
88///     let data = indoc::indoc! {"
89///         city;country;pop
90///         Boston;United States;4628910
91///     "};
92///     let mut rdr = AsyncReaderBuilder::new()
93///         .delimiter(b';')
94///         .create_deserializer(data.as_bytes());
95///
96///     let mut records = rdr.deserialize::<Row>();
97///     assert_eq!(records.next().await.unwrap()?,
98///                Row {city: "Boston".to_string(),
99///                     country: "United States".to_string(),
100///                     pop: 4628910 });
101///     Ok(())
102/// }
103/// ```
104///
105/// # Error handling
106///
107/// In general, CSV *parsing* does not ever return an error. That is, there is
108/// no such thing as malformed CSV data. Instead, this reader will prioritize
109/// finding a parse over rejecting CSV data that it does not understand. This
110/// choice was inspired by other popular CSV parsers, but also because it is
111/// pragmatic. CSV data varies wildly, so even if the CSV data is malformed,
112/// it might still be possible to work with the data. In the land of CSV, there
113/// is no "right" or "wrong," only "right" and "less right."
114///
115/// With that said, a number of errors can occur while reading CSV data:
116///
117/// * By default, all records in CSV data must have the same number of fields.
118///   If a record is found with a different number of fields than a prior
119///   record, then an error is returned. This behavior can be disabled by
120///   enabling flexible parsing via the `flexible` method on
121///   [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html).
122/// * When reading CSV data from a resource (like a file), it is possible for
123///   reading from the underlying resource to fail. This will return an error.
124///   For subsequent calls to the reader after encountering a such error
125///   (unless `seek` is used), it will behave as if end of file had been
126///   reached, in order to avoid running into infinite loops when still
127///   attempting to read the next record when one has errored.
128/// * When reading CSV data into `String` or `&str` fields (e.g., via a
129///   [`StringRecord`](struct.StringRecord.html)), UTF-8 is strictly
130///   enforced. If CSV data is invalid UTF-8, then an error is returned. If
131///   you want to read invalid UTF-8, then you should use the byte oriented
132///   APIs such as [`ByteRecord`](struct.ByteRecord.html). If you need explicit
133///   support for another encoding entirely, then you'll need to use another
134///   crate to transcode your CSV data to UTF-8 before parsing it.
135/// * When using Serde to deserialize CSV data into Rust types, it is possible
136///   for a number of additional errors to occur. For example, deserializing
137///   a field `xyz` into an `i32` field will result in an error.
138///
139/// For more details on the precise semantics of errors, see the
140/// [`Error`](enum.Error.html) type.
141#[derive(Debug)]
142pub struct AsyncDeserializer<R>(AsyncReaderImpl<R>);
143
144impl<'r, R> AsyncDeserializer<R>
145where
146    R: io::AsyncRead + Unpin + Send + 'r,
147{
148    /// Create a new CSV reader given a builder and a source of underlying
149    /// bytes.
150    fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncDeserializer<R> {
151        AsyncDeserializer(AsyncReaderImpl::new(builder, rdr))
152    }
153
154    /// Create a new CSV parser with a default configuration for the given
155    /// reader.
156    ///
157    /// To customize CSV parsing, use a `ReaderBuilder`.
158    ///
159    /// # Example
160    ///
161    /// ```
162    /// # use tokio1 as tokio;
163    /// use std::error::Error;
164    /// use csv_async::AsyncDeserializer;
165    /// use serde::Deserialize;
166    /// use tokio_stream::StreamExt;
167    /// 
168    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
169    /// struct Row {
170    ///     city: String,
171    ///     country: String,
172    ///     pop: u64,
173    /// }
174    ///
175    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
176    /// async fn example() -> Result<(), Box<dyn Error>> {
177    ///     let data = "\
178    /// city,country,pop
179    /// Boston,United States,4628910
180    /// Concord,United States,42695
181    /// ";
182    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
183    ///     let mut records = rdr.into_deserialize::<Row>();
184    ///     while let Some(record) = records.next().await {
185    ///         println!("{:?}", record?);
186    ///     }
187    ///     Ok(())
188    /// }
189    /// ```
190    #[inline]
191    pub fn from_reader(rdr: R) -> AsyncDeserializer<R> {
192        AsyncReaderBuilder::new().create_deserializer(rdr)
193    }
194
195    /// Returns a borrowed stream over deserialized records.
196    ///
197    /// Each item yielded by this stream is a `Result<D, Error>`.
198    /// Therefore, in order to access the record, callers must handle the
199    /// possibility of error (typically with `?`).
200    ///
201    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
202    /// default), then this does not include the first record. Additionally,
203    /// if `has_headers` is enabled, then deserializing into a struct will
204    /// automatically align the values in each row to the fields of a struct
205    /// based on the header row.
206    /// 
207    /// Frequently turbo-fish notation is needed while calling this function:
208    /// `rdr.deserialize::<RecordType>();`
209    ///
210    /// # Example
211    ///
212    /// This shows how to deserialize CSV data into normal Rust structures. The
213    /// fields of the header row are used to match up the values in each row
214    /// to the fields of the struct.
215    ///
216    /// ```
217    /// # use tokio1 as tokio;
218    /// use std::error::Error;
219    ///
220    /// use csv_async::AsyncDeserializer;
221    /// use serde::Deserialize;
222    /// use tokio_stream::StreamExt;
223    ///
224    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
225    /// struct Row {
226    ///     city: String,
227    ///     country: String,
228    ///     #[serde(rename = "popcount")]
229    ///     population: u64,
230    /// }
231    ///
232    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
233    /// async fn example() -> Result<(), Box<dyn Error>> {
234    ///     let data = "\
235    /// city,country,popcount
236    /// Boston,United States,4628910
237    /// ";
238    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
239    ///     let mut iter = rdr.deserialize();
240    ///
241    ///     if let Some(result) = iter.next().await {
242    ///         let record: Row = result?;
243    ///         assert_eq!(record, Row {
244    ///             city: "Boston".to_string(),
245    ///             country: "United States".to_string(),
246    ///             population: 4628910,
247    ///         });
248    ///         Ok(())
249    ///     } else {
250    ///         Err(From::from("expected at least one record but got none"))
251    ///     }
252    /// }
253    /// ```
254    ///
255    /// # Rules
256    ///
257    /// For the most part, any Rust type that maps straight-forwardly to a CSV
258    /// record is supported. This includes maps, structs, tuples and tuple
259    /// structs. Other Rust types, such as `Vec`s, arrays, and enums have
260    /// a more complicated story. In general, when working with CSV data, one
261    /// should avoid *nested sequences* as much as possible.
262    ///
263    /// Maps, structs, tuples and tuple structs map to CSV records in a simple
264    /// way. Tuples and tuple structs decode their fields in the order that
265    /// they are defined. Structs will do the same only if `has_headers` has
266    /// been disabled using [`ReaderBuilder`](struct.ReaderBuilder.html),
267    /// otherwise, structs and maps are deserialized based on the fields
268    /// defined in the header row. (If there is no header row, then
269    /// deserializing into a map will result in an error.)
270    ///
271    /// Nested sequences are supported in a limited capacity. Namely, they
272    /// are flattened. As a result, it's often useful to use a `Vec` to capture
273    /// a "tail" of fields in a record:
274    ///
275    /// ```
276    /// # use tokio1 as tokio;
277    /// use std::error::Error;
278    ///
279    /// use csv_async::AsyncReaderBuilder;
280    /// use serde::Deserialize;
281    /// use tokio_stream::StreamExt;
282    ///
283    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
284    /// struct Row {
285    ///     label: String,
286    ///     values: Vec<i32>,
287    /// }
288    ///
289    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
290    /// async fn example() -> Result<(), Box<dyn Error>> {
291    ///     let data = "foo,1,2,3";
292    ///     let mut rdr = AsyncReaderBuilder::new()
293    ///         .has_headers(false)
294    ///         .create_deserializer(data.as_bytes());
295    ///     let mut iter = rdr.deserialize();
296    ///
297    ///     if let Some(result) = iter.next().await {
298    ///         let record: Row = result?;
299    ///         assert_eq!(record, Row {
300    ///             label: "foo".to_string(),
301    ///             values: vec![1, 2, 3],
302    ///         });
303    ///         Ok(())
304    ///     } else {
305    ///         Err(From::from("expected at least one record but got none"))
306    ///     }
307    /// }
308    /// ```
309    ///
310    /// In the above example, adding another field to the `Row` struct after
311    /// the `values` field will result in a deserialization error. This is
312    /// because the deserializer doesn't know when to stop reading fields
313    /// into the `values` vector, so it will consume the rest of the fields in
314    /// the record leaving none left over for the additional field.
315    ///
316    /// Finally, simple enums in Rust can be deserialized as well. Namely,
317    /// enums must either be variants with no arguments or variants with a
318    /// single argument. Variants with no arguments are deserialized based on
319    /// which variant name the field matches. Variants with a single argument
320    /// are deserialized based on which variant can store the data. The latter
321    /// is only supported when using "untagged" enum deserialization. The
322    /// following example shows both forms in action:
323    ///
324    /// ```
325    /// # use tokio1 as tokio;
326    /// use std::error::Error;
327    ///
328    /// use csv_async::AsyncDeserializer;
329    /// use serde::Deserialize;
330    /// use tokio_stream::StreamExt;
331    ///
332    /// #[derive(Debug, Deserialize, PartialEq)]
333    /// struct Row {
334    ///     label: Label,
335    ///     value: Number,
336    /// }
337    ///
338    /// #[derive(Debug, Deserialize, PartialEq)]
339    /// #[serde(rename_all = "lowercase")]
340    /// enum Label {
341    ///     Celsius,
342    ///     Fahrenheit,
343    /// }
344    ///
345    /// #[derive(Debug, Deserialize, PartialEq)]
346    /// #[serde(untagged)]
347    /// enum Number {
348    ///     Integer(i64),
349    ///     Float(f64),
350    /// }
351    ///
352    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
353    /// async fn example() -> Result<(), Box<dyn Error>> {
354    ///     let data = "\
355    /// label,value
356    /// celsius,22.2222
357    /// fahrenheit,72
358    /// ";
359    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
360    ///     let mut iter = rdr.deserialize();
361    ///
362    ///     // Read the first record.
363    ///     if let Some(result) = iter.next().await {
364    ///         let record: Row = result?;
365    ///         assert_eq!(record, Row {
366    ///             label: Label::Celsius,
367    ///             value: Number::Float(22.2222),
368    ///         });
369    ///     } else {
370    ///         return Err(From::from(
371    ///             "expected at least two records but got none"));
372    ///     }
373    ///
374    ///     // Read the second record.
375    ///     if let Some(result) = iter.next().await {
376    ///         let record: Row = result?;
377    ///         assert_eq!(record, Row {
378    ///             label: Label::Fahrenheit,
379    ///             value: Number::Integer(72),
380    ///         });
381    ///         Ok(())
382    ///     } else {
383    ///         Err(From::from(
384    ///             "expected at least two records but got only one"))
385    ///     }
386    /// }
387    /// ```
388    #[inline]
389    pub fn deserialize<D:'r>(&'r mut self) -> DeserializeRecordsStream<'r, R, D>
390    where
391        D: DeserializeOwned,
392    {
393        DeserializeRecordsStream::new(& mut self.0)
394    }
395
396    /// Returns a borrowed stream over pairs of deserialized record and position
397    /// in reader stream before record read.
398    ///
399    /// Each item yielded by this stream is a `(Result<D, Error>, Position)`.
400    /// Therefore, in order to access the record, callers must handle the
401    /// possibility of error (typically with `?`).
402    /// 
403    /// Frequently turbo-fish notation is needed while calling this function:
404    /// `rdr.deserialize_with_pos::<RecordType>();`
405    ///
406    /// # Example
407    ///
408    /// This shows how to deserialize CSV data into normal Rust structures. The
409    /// fields of the header row are used to match up the values in each row
410    /// to the fields of the struct.
411    ///
412    /// ```
413    /// # use tokio1 as tokio;
414    /// use std::error::Error;
415    ///
416    /// use csv_async::AsyncDeserializer;
417    /// use serde::Deserialize;
418    /// use tokio_stream::StreamExt;
419    ///
420    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
421    /// struct Row {
422    ///     city: String,
423    ///     country: String,
424    ///     population: u64,
425    /// }
426    ///
427    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
428    /// async fn example() -> Result<(), Box<dyn Error>> {
429    ///     let data = "\
430    /// city,country,population
431    /// Boston,United States,4628910
432    /// Concord,United States,42695
433    /// ";
434    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
435    ///     let mut iter = rdr.deserialize_with_pos();
436    ///
437    ///     if let Some((result, pos)) = iter.next().await {
438    ///         let record: Row = result?;
439    ///         assert_eq!(record, Row {
440    ///             city: "Boston".to_string(),
441    ///             country: "United States".to_string(),
442    ///             population: 4628910,
443    ///         });
444    ///         assert_eq!(pos.byte(), 24);
445    ///         assert_eq!(pos.line(), 2);
446    ///         assert_eq!(pos.record(), 1);
447    ///     } else {
448    ///         return Err(From::from("expected at least one record but got none"));
449    ///     }
450    ///     if let Some((result, pos)) = iter.next().await {
451    ///         let record: Row = result?;
452    ///         assert_eq!(record, Row {
453    ///             city: "Concord".to_string(),
454    ///             country: "United States".to_string(),
455    ///             population: 42695,
456    ///         });
457    ///         assert_eq!(pos.byte(), 53);
458    ///         assert_eq!(pos.line(), 3);
459    ///         assert_eq!(pos.record(), 2);
460    ///     } else {
461    ///         return Err(From::from("expected at least two records but got one only"));
462    ///     }
463    ///     assert!(iter.next().await.is_none());
464    ///     Ok(())
465    /// }
466    /// ```
467    #[inline]
468    pub fn deserialize_with_pos<D:'r>(&'r mut self) -> DeserializeRecordsStreamPos<'r, R, D>
469    where
470        D: DeserializeOwned,
471    {
472        DeserializeRecordsStreamPos::new(& mut self.0)
473    }
474
475    /// Returns a owned stream over deserialized records.
476    ///
477    /// Each item yielded by this stream is a `Result<D, Error>`.
478    /// Therefore, in order to access the record, callers must handle the
479    /// possibility of error (typically with `?`).
480    ///
481    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
482    /// default), then this does not include the first record. Additionally,
483    /// if `has_headers` is enabled, then deserializing into a struct will
484    /// automatically align the values in each row to the fields of a struct
485    /// based on the header row.
486    /// 
487    /// Frequently turbo-fish notation is needed while calling this function:
488    /// `rdr.into_deserialize::<RecordType>();`
489    ///
490    /// # Example
491    ///
492    /// This shows how to deserialize CSV data into normal Rust structs. The
493    /// fields of the header row are used to match up the values in each row
494    /// to the fields of the struct.
495    ///
496    /// ```
497    /// # use tokio1 as tokio;
498    /// use std::error::Error;
499    ///
500    /// use csv_async::AsyncDeserializer;
501    /// use serde::Deserialize;
502    /// use tokio_stream::StreamExt;
503    ///
504    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
505    /// struct Row {
506    ///     city: String,
507    ///     country: String,
508    ///     #[serde(rename = "popcount")]
509    ///     population: u64,
510    /// }
511    ///
512    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
513    /// async fn example() -> Result<(), Box<dyn Error>> {
514    ///     let data = "\
515    /// city,country,popcount
516    /// Boston,United States,4628910
517    /// ";
518    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
519    ///     let mut iter = rdr.into_deserialize();
520    ///
521    ///     if let Some(result) = iter.next().await {
522    ///         let record: Row = result?;
523    ///         assert_eq!(record, Row {
524    ///             city: "Boston".to_string(),
525    ///             country: "United States".to_string(),
526    ///             population: 4628910,
527    ///         });
528    ///         Ok(())
529    ///     } else {
530    ///         Err(From::from("expected at least one record but got none"))
531    ///     }
532    /// }
533    /// ```
534    #[inline]
535    pub fn into_deserialize<D:'r>(self) -> DeserializeRecordsIntoStream<'r, R, D>
536    where
537        D: DeserializeOwned,
538    {
539        DeserializeRecordsIntoStream::new(self.0)
540    }
541
542    /// Returns a owned stream over pairs of deserialized record and position
543    /// in reader stream before record read.
544    ///
545    #[inline]
546    pub fn into_deserialize_with_pos<D:'r>(self) -> DeserializeRecordsIntoStreamPos<'r, R, D>
547    where
548        D: DeserializeOwned,
549    {
550        DeserializeRecordsIntoStreamPos::new(self.0)
551    }
552
553    /// Returns a reference to the first row read by this parser.
554    ///
555    /// If no row has been read yet, then this will force parsing of the first
556    /// row.
557    ///
558    /// If there was a problem parsing the row or if it wasn't valid UTF-8,
559    /// then this returns an error.
560    ///
561    /// If the underlying reader emits EOF before any data, then this returns
562    /// an empty record.
563    ///
564    /// Note that this method may be used regardless of whether `has_headers`
565    /// was enabled (but it is enabled by default).
566    ///
567    /// # Example
568    ///
569    /// This example shows how to get the header row of CSV data. Notice that
570    /// the header row does not appear as a record in the iterator!
571    ///
572    /// ```
573    /// # use tokio1 as tokio;
574    /// use std::error::Error;
575    /// use csv_async::AsyncDeserializer;
576    /// use serde::Deserialize;
577    /// use tokio_stream::StreamExt;
578    /// 
579    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
580    /// struct Row {
581    ///     city: String,
582    ///     country: String,
583    ///     pop: u64,
584    /// }
585    ///
586    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
587    /// async fn example() -> Result<(), Box<dyn Error>> {
588    ///     let data = "\
589    /// city,country,pop
590    /// Boston,United States,4628910
591    /// ";
592    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
593    ///
594    ///     // We can read the headers before iterating.
595    ///     {
596    ///     // `headers` borrows from the reader, so we put this in its
597    ///     // own scope. That way, the borrow ends before we try iterating
598    ///     // below. Alternatively, we could clone the headers.
599    ///     let headers = rdr.headers().await?;
600    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
601    ///     }
602    ///
603    ///     {
604    ///     let mut records = rdr.deserialize::<Row>();
605    ///     assert_eq!(records.next().await.unwrap()?,
606    ///                Row {city: "Boston".to_string(),
607    ///                     country: "United States".to_string(),
608    ///                     pop: 4628910 });
609    ///     assert!(records.next().await.is_none());
610    ///     }
611    ///
612    ///     // We can also read the headers after iterating.
613    ///     let headers = rdr.headers().await?;
614    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
615    ///     Ok(())
616    /// }
617    /// ```
618    #[inline]
619    pub async fn headers(&mut self) -> Result<&StringRecord> {
620        self.0.headers().await
621    }
622
623    /// Returns a reference to the first row read by this parser as raw bytes.
624    ///
625    /// If no row has been read yet, then this will force parsing of the first
626    /// row.
627    ///
628    /// If there was a problem parsing the row then this returns an error.
629    ///
630    /// If the underlying reader emits EOF before any data, then this returns
631    /// an empty record.
632    ///
633    /// Note that this method may be used regardless of whether `has_headers`
634    /// was enabled (but it is enabled by default).
635    ///
636    /// # Example
637    ///
638    /// This example shows how to get the header row of CSV data. Notice that
639    /// the header row does not appear as a record in the iterator!
640    ///
641    /// ```
642    /// # use tokio1 as tokio;
643    /// use std::error::Error;
644    /// use csv_async::AsyncDeserializer;
645    /// use serde::Deserialize;
646    /// use tokio_stream::StreamExt;
647    ///
648    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
649    /// struct Row {
650    ///     city: String,
651    ///     country: String,
652    ///     #[serde(rename = "pop")]
653    ///     population: u64,
654    /// }
655    /// 
656    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
657    /// async fn example() -> Result<(), Box<dyn Error>> {
658    ///     let data = indoc::indoc! {"
659    ///         city,country,pop
660    ///         Boston,United States,4628910
661    ///     "};
662    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
663    ///
664    ///     // We can read the headers before iterating.
665    ///     {
666    ///     // `headers` borrows from the reader, so we put this in its
667    ///     // own scope. That way, the borrow ends before we try iterating
668    ///     // below. Alternatively, we could clone the headers.
669    ///     let headers = rdr.byte_headers().await?;
670    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
671    ///     }
672    ///
673    ///     {
674    ///     let mut records = rdr.deserialize::<Row>();
675    ///     assert_eq!(records.next().await.unwrap()?,
676    ///                Row {city: "Boston".to_string(),
677    ///                     country: "United States".to_string(),
678    ///                     population: 4628910 });
679    ///     assert!(records.next().await.is_none());
680    ///     }
681    ///
682    ///     // We can also read the headers after iterating.
683    ///     let headers = rdr.byte_headers().await?;
684    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
685    ///     Ok(())
686    /// }
687    /// ```
688    #[inline]
689    pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
690        self.0.byte_headers().await
691    }
692
693    /// Set the headers of this CSV parser manually.
694    ///
695    /// This overrides any other setting (including `set_byte_headers`). Any
696    /// automatic detection of headers is disabled. This may be called at any
697    /// time.
698    ///
699    /// # Example
700    ///
701    /// ```
702    /// use std::error::Error;
703    /// use csv_async::{AsyncDeserializer, StringRecord};
704    ///
705    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
706    /// async fn example() -> Result<(), Box<dyn Error>> {
707    ///     let data = "\
708    /// city,country,pop
709    /// Boston,United States,4628910
710    /// ";
711    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
712    ///
713    ///     assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
714    ///     rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
715    ///     assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
716    ///
717    ///     Ok(())
718    /// }
719    /// ```
720    #[inline]
721    pub fn set_headers(&mut self, headers: StringRecord) {
722        self.0.set_headers(headers);
723    }
724
725    /// Set the headers of this CSV parser manually as raw bytes.
726    ///
727    /// This overrides any other setting (including `set_headers`). Any
728    /// automatic detection of headers is disabled. This may be called at any
729    /// time.
730    ///
731    /// # Example
732    ///
733    /// ```
734    /// use std::error::Error;
735    /// use csv_async::{AsyncDeserializer, ByteRecord};
736    ///
737    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
738    /// async fn example() -> Result<(), Box<dyn Error>> {
739    ///     let data = "\
740    /// city,country,pop
741    /// Boston,United States,4628910
742    /// ";
743    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
744    ///
745    ///     assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
746    ///     rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
747    ///     assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
748    ///
749    ///     Ok(())
750    /// }
751    /// ```
752    #[inline]
753    pub fn set_byte_headers(&mut self, headers: ByteRecord) {
754        self.0.set_byte_headers(headers);
755    }
756
757    /// Read a single row into the given record. Returns false when no more
758    /// records could be read.
759    ///
760    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
761    /// default), then this will never read the first record.
762    ///
763    /// This method is useful when you want to read records as fast as
764    /// as possible. It's less ergonomic than an iterator, but it permits the
765    /// caller to reuse the `StringRecord` allocation, which usually results
766    /// in higher throughput.
767    ///
768    /// Records read via this method are guaranteed to have a position set
769    /// on them, even if the reader is at EOF or if an error is returned.
770    ///
771    /// # Example
772    ///
773    /// ```
774    /// use std::error::Error;
775    /// use csv_async::{AsyncDeserializer, StringRecord};
776    ///
777    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
778    /// async fn example() -> Result<(), Box<dyn Error>> {
779    ///     let data = "\
780    /// city,country,pop
781    /// Boston,United States,4628910
782    /// ";
783    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
784    ///     let mut record = StringRecord::new();
785    ///
786    ///     if rdr.read_record(&mut record).await? {
787    ///         assert_eq!(record, vec!["Boston", "United States", "4628910"]);
788    ///         Ok(())
789    ///     } else {
790    ///         Err(From::from("expected at least one record but got none"))
791    ///     }
792    /// }
793    /// ```
794    #[inline]
795    pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
796        self.0.read_record(record).await
797    }
798
799    /// Read a single row into the given byte record. Returns false when no
800    /// more records could be read.
801    ///
802    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
803    /// default), then this will never read the first record.
804    ///
805    /// This method is useful when you want to read records as fast as
806    /// as possible. It's less ergonomic than an iterator, but it permits the
807    /// caller to reuse the `ByteRecord` allocation, which usually results
808    /// in higher throughput.
809    ///
810    /// Records read via this method are guaranteed to have a position set
811    /// on them, even if the reader is at EOF or if an error is returned.
812    ///
813    /// # Example
814    ///
815    /// ```
816    /// use std::error::Error;
817    /// use csv_async::{ByteRecord, AsyncDeserializer};
818    ///
819    /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
820    /// async fn example() -> Result<(), Box<dyn Error>> {
821    ///     let data = "\
822    /// city,country,pop
823    /// Boston,United States,4628910
824    /// ";
825    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
826    ///     let mut record = ByteRecord::new();
827    ///
828    ///     if rdr.read_byte_record(&mut record).await? {
829    ///         assert_eq!(record, vec!["Boston", "United States", "4628910"]);
830    ///         Ok(())
831    ///     } else {
832    ///         Err(From::from("expected at least one record but got none"))
833    ///     }
834    /// }
835    /// ```
836    #[inline]
837    pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
838        self.0.read_byte_record(record).await
839    }
840
841    /// Return the current position of this CSV deserializer.
842    /// 
843    /// Because of borrowing rules this function can only be used when there is no
844    /// alive deserializer (which borrows mutable reader).
845    /// To know position during deserialization, `deserialize_with_pos` should be
846    /// used as shown in below example.
847    ///
848    /// The byte offset in the position returned can be used to `seek` this
849    /// deserializer. In particular, seeking to a position returned here on the same
850    /// data will result in parsing the same subsequent record.
851    ///
852    /// # Example: reading the position
853    ///
854    /// ```
855    /// # use tokio1 as tokio;
856    /// use std::error::Error;
857    /// use std::io;
858    /// use csv_async::{AsyncDeserializer, Position};
859    /// use serde::Deserialize;
860    /// use tokio_stream::StreamExt;
861    ///
862    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
863    /// struct Row {
864    ///     city: String,
865    ///     country: String,
866    ///     popcount: u64,
867    /// }
868    ///
869    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
870    /// async fn example() -> Result<(), Box<dyn Error>> {
871    ///     let data = "\
872    /// city,country,popcount
873    /// Boston,United States,4628910
874    /// Concord,United States,42695
875    /// ";
876    ///     let mut rdr = AsyncDeserializer::from_reader(io::Cursor::new(data));
877    ///     let mut iter = rdr.deserialize_with_pos::<Row>();
878    ///     let mut pos_at_boston = Position::new();
879    ///     while let Some((rec, pos)) = iter.next().await {
880    ///         if rec?.city == "Boston" {
881    ///             pos_at_boston = pos;
882    ///         }
883    ///     }
884    ///     drop(iter); // releases rdr borrow by iter
885    ///     let pos_at_end = rdr.position();
886    /// 
887    ///     assert_eq!(pos_at_boston.byte(),  22);
888    ///     assert_eq!(pos_at_boston.line(),   2);
889    ///     assert_eq!(pos_at_boston.record(), 1);
890    ///     assert_eq!(pos_at_end.byte(),  79);
891    ///     assert_eq!(pos_at_end.line(),   4);
892    ///     assert_eq!(pos_at_end.record(), 3);
893    ///     Ok(())
894    /// }
895    /// ```
896    #[inline]
897    pub fn position(&self) -> &Position {
898        self.0.position()
899    }
900
901    /// Returns true if and only if this reader has been exhausted.
902    ///
903    /// When this returns true, no more records can be read from this reader.
904    ///
905    /// # Example
906    ///
907    /// ```
908    /// # use tokio1 as tokio;
909    /// use std::error::Error;
910    /// use csv_async::{AsyncDeserializer, Position};
911    /// use serde::Deserialize;
912    /// use tokio_stream::StreamExt;
913    ///
914    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
915    /// struct Row {
916    ///     city: String,
917    ///     country: String,
918    ///     popcount: u64,
919    /// }
920    ///
921    /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
922    /// async fn example() -> Result<(), Box<dyn Error>> {
923    ///     let data = "\
924    /// city,country,popcount
925    /// Boston,United States,4628910
926    /// Concord,United States,42695
927    /// ";
928    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
929    ///     assert!(!rdr.is_done());
930    ///     {
931    ///         let mut records = rdr.deserialize::<Row>();
932    ///         while let Some(record) = records.next().await {
933    ///             let _ = record?;
934    ///         }
935    ///     }
936    ///     assert!(rdr.is_done());
937    ///     Ok(())
938    /// }
939    /// ```
940    #[inline]
941    pub fn is_done(&self) -> bool {
942        self.0.is_done()
943    }
944
945    /// Returns true if and only if this reader has been configured to
946    /// interpret the first record as a header record.
947    #[inline]
948    pub fn has_headers(&self) -> bool {
949        self.0.has_headers()
950    }
951
952    /// Returns a reference to the underlying reader.
953    #[inline]
954    pub fn get_ref(&self) -> &R {
955        self.0.get_ref()
956    }
957
958    /// Returns a mutable reference to the underlying reader.
959    #[inline]
960    pub fn get_mut(&mut self) -> &mut R {
961        self.0.get_mut()
962    }
963
964    /// Unwraps this CSV reader, returning the underlying reader.
965    ///
966    /// Note that any leftover data inside this reader's internal buffer is
967    /// lost.
968    #[inline]
969    pub fn into_inner(self) -> R {
970        self.0.into_inner()
971    }
972}
973
974
975#[cfg(test)]
976mod tests {
977    use std::pin::Pin;
978    use std::task::{Context, Poll};
979
980    use tokio::io;
981    use tokio_stream::StreamExt;
982    use serde::Deserialize;
983    use tokio::runtime::Runtime;
984
985    use crate::byte_record::ByteRecord;
986    use crate::error::ErrorKind;
987    use crate::string_record::StringRecord;
988    use crate::Trim;
989
990    use super::{Position, AsyncReaderBuilder, AsyncDeserializer};
991
992    fn b(s: &str) -> &[u8] {
993        s.as_bytes()
994    }
995    fn s(b: &[u8]) -> &str {
996        ::std::str::from_utf8(b).unwrap()
997    }
998
999    fn newpos(byte: u64, line: u64, record: u64) -> Position {
1000        let mut p = Position::new();
1001        p.set_byte(byte).set_line(line).set_record(record);
1002        p
1003    }
1004
1005    async fn count(stream: impl StreamExt) -> usize {
1006        stream.fold(0, |acc, _| acc + 1 ).await
1007    }
1008
1009    #[tokio::test]
1010    async fn read_byte_record() {
1011        let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
1012        let mut rdr =
1013            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1014        let mut rec = ByteRecord::new();
1015
1016        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1017        assert_eq!(3, rec.len());
1018        assert_eq!("foo", s(&rec[0]));
1019        assert_eq!("b,ar", s(&rec[1]));
1020        assert_eq!("baz", s(&rec[2]));
1021
1022        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1023        assert_eq!(3, rec.len());
1024        assert_eq!("abc", s(&rec[0]));
1025        assert_eq!("mno", s(&rec[1]));
1026        assert_eq!("xyz", s(&rec[2]));
1027
1028        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1029    }
1030
1031    #[tokio::test]
1032    async fn read_trimmed_records_and_headers() {
1033        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
1034        let mut rdr = AsyncReaderBuilder::new()
1035            .has_headers(true)
1036            .trim(Trim::All)
1037            .create_deserializer(data);
1038        let mut rec = ByteRecord::new();
1039        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1040        assert_eq!("1", s(&rec[0]));
1041        assert_eq!("2", s(&rec[1]));
1042        assert_eq!("3", s(&rec[2]));
1043        let mut rec = StringRecord::new();
1044        assert!(rdr.read_record(&mut rec).await.unwrap());
1045        assert_eq!("1", &rec[0]);
1046        assert_eq!("", &rec[1]);
1047        assert_eq!("3", &rec[2]);
1048        {
1049            let headers = rdr.headers().await.unwrap();
1050            assert_eq!(3, headers.len());
1051            assert_eq!("foo", &headers[0]);
1052            assert_eq!("bar", &headers[1]);
1053            assert_eq!("baz", &headers[2]);
1054        }
1055    }
1056
1057    #[tokio::test]
1058    async fn read_trimmed_header() {
1059        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
1060        let mut rdr = AsyncReaderBuilder::new()
1061            .has_headers(true)
1062            .trim(Trim::Headers)
1063            .create_deserializer(data);
1064        let mut rec = ByteRecord::new();
1065        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1066        assert_eq!("  1", s(&rec[0]));
1067        assert_eq!("  2", s(&rec[1]));
1068        assert_eq!("  3", s(&rec[2]));
1069        {
1070            let headers = rdr.headers().await.unwrap();
1071            assert_eq!(3, headers.len());
1072            assert_eq!("foo", &headers[0]);
1073            assert_eq!("bar", &headers[1]);
1074            assert_eq!("baz", &headers[2]);
1075        }
1076    }
1077
1078    #[tokio::test]
1079    async fn read_trimed_header_invalid_utf8() {
1080        let data = &b"foo,  b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
1081        let mut rdr = AsyncReaderBuilder::new()
1082            .has_headers(true)
1083            .trim(Trim::Headers)
1084            .create_deserializer(data);
1085        let mut rec = StringRecord::new();
1086
1087        // force the headers to be read
1088        let _ = rdr.read_record(&mut rec).await;
1089        // Check the byte headers are trimmed
1090        {
1091            let headers = rdr.byte_headers().await.unwrap();
1092            assert_eq!(3, headers.len());
1093            assert_eq!(b"foo", &headers[0]);
1094            assert_eq!(b"b\xFFar", &headers[1]);
1095            assert_eq!(b"baz", &headers[2]);
1096        }
1097        match *rdr.headers().await.unwrap_err().kind() {
1098            ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1099                assert_eq!(pos, &newpos(0, 1, 0));
1100                assert_eq!(err.field(), 1);
1101                assert_eq!(err.valid_up_to(), 3);
1102            }
1103            ref err => panic!("match failed, got {:?}", err),
1104        }
1105    }
1106
1107    #[tokio::test]
1108    async fn read_trimmed_records() {
1109        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
1110        let mut rdr = AsyncReaderBuilder::new()
1111            .has_headers(true)
1112            .trim(Trim::Fields)
1113            .create_deserializer(data);
1114        let mut rec = ByteRecord::new();
1115        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1116        assert_eq!("1", s(&rec[0]));
1117        assert_eq!("2", s(&rec[1]));
1118        assert_eq!("3", s(&rec[2]));
1119        {
1120            let headers = rdr.headers().await.unwrap();
1121            assert_eq!(3, headers.len());
1122            assert_eq!("foo", &headers[0]);
1123            assert_eq!("  bar", &headers[1]);
1124            assert_eq!("\tbaz", &headers[2]);
1125        }
1126    }
1127
1128    #[tokio::test]
1129    async fn read_record_unequal_fails() {
1130        let data = b("foo\nbar,baz");
1131        let mut rdr =
1132            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1133        let mut rec = ByteRecord::new();
1134
1135        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1136        assert_eq!(1, rec.len());
1137        assert_eq!("foo", s(&rec[0]));
1138
1139        match rdr.read_byte_record(&mut rec).await {
1140            Err(err) => match *err.kind() {
1141                ErrorKind::UnequalLengths {
1142                    expected_len: 1,
1143                    ref pos,
1144                    len: 2,
1145                } => {
1146                    assert_eq!(pos, &Some(newpos(4, 2, 1)));
1147                }
1148                ref wrong => panic!("match failed, got {:?}", wrong),
1149            },
1150            wrong => panic!("match failed, got {:?}", wrong),
1151        }
1152    }
1153
1154    #[tokio::test]
1155    async fn read_record_unequal_ok() {
1156        let data = b("foo\nbar,baz");
1157        let mut rdr = AsyncReaderBuilder::new()
1158            .has_headers(false)
1159            .flexible(true)
1160            .create_deserializer(data);
1161        let mut rec = ByteRecord::new();
1162
1163        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1164        assert_eq!(1, rec.len());
1165        assert_eq!("foo", s(&rec[0]));
1166
1167        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1168        assert_eq!(2, rec.len());
1169        assert_eq!("bar", s(&rec[0]));
1170        assert_eq!("baz", s(&rec[1]));
1171
1172        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1173    }
1174
1175    // This tests that even if we get a CSV error, we can continue reading
1176    // if we want.
1177    #[tokio::test]
1178    async fn read_record_unequal_continue() {
1179        let data = b("foo\nbar,baz\nquux");
1180        let mut rdr =
1181            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1182        let mut rec = ByteRecord::new();
1183
1184        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1185        assert_eq!(1, rec.len());
1186        assert_eq!("foo", s(&rec[0]));
1187
1188        match rdr.read_byte_record(&mut rec).await {
1189            Err(err) => match err.kind() {
1190                &ErrorKind::UnequalLengths {
1191                    expected_len: 1,
1192                    ref pos,
1193                    len: 2,
1194                } => {
1195                    assert_eq!(pos, &Some(newpos(4, 2, 1)));
1196                }
1197                wrong => panic!("match failed, got {:?}", wrong),
1198            },
1199            wrong => panic!("match failed, got {:?}", wrong),
1200        }
1201
1202        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1203        assert_eq!(1, rec.len());
1204        assert_eq!("quux", s(&rec[0]));
1205
1206        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1207    }
1208
1209    #[tokio::test]
1210    async fn read_record_headers() {
1211        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1212        let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
1213        let mut rec = StringRecord::new();
1214
1215        assert!(rdr.read_record(&mut rec).await.unwrap());
1216        assert_eq!(3, rec.len());
1217        assert_eq!("a", &rec[0]);
1218
1219        assert!(rdr.read_record(&mut rec).await.unwrap());
1220        assert_eq!(3, rec.len());
1221        assert_eq!("d", &rec[0]);
1222
1223        assert!(!rdr.read_record(&mut rec).await.unwrap());
1224
1225        {
1226            let headers = rdr.byte_headers().await.unwrap();
1227            assert_eq!(3, headers.len());
1228            assert_eq!(b"foo", &headers[0]);
1229            assert_eq!(b"bar", &headers[1]);
1230            assert_eq!(b"baz", &headers[2]);
1231        }
1232        {
1233            let headers = rdr.headers().await.unwrap();
1234            assert_eq!(3, headers.len());
1235            assert_eq!("foo", &headers[0]);
1236            assert_eq!("bar", &headers[1]);
1237            assert_eq!("baz", &headers[2]);
1238        }
1239    }
1240
1241    #[tokio::test]
1242    async fn read_record_headers_invalid_utf8() {
1243        let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
1244        let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
1245        let mut rec = StringRecord::new();
1246
1247        assert!(rdr.read_record(&mut rec).await.unwrap());
1248        assert_eq!(3, rec.len());
1249        assert_eq!("a", &rec[0]);
1250
1251        assert!(rdr.read_record(&mut rec).await.unwrap());
1252        assert_eq!(3, rec.len());
1253        assert_eq!("d", &rec[0]);
1254
1255        assert!(!rdr.read_record(&mut rec).await.unwrap());
1256
1257        // Check that we can read the headers as raw bytes, but that
1258        // if we read them as strings, we get an appropriate UTF-8 error.
1259        {
1260            let headers = rdr.byte_headers().await.unwrap();
1261            assert_eq!(3, headers.len());
1262            assert_eq!(b"foo", &headers[0]);
1263            assert_eq!(b"b\xFFar", &headers[1]);
1264            assert_eq!(b"baz", &headers[2]);
1265        }
1266        match *rdr.headers().await.unwrap_err().kind() {
1267            ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1268                assert_eq!(pos, &newpos(0, 1, 0));
1269                assert_eq!(err.field(), 1);
1270                assert_eq!(err.valid_up_to(), 1);
1271            }
1272            ref err => panic!("match failed, got {:?}", err),
1273        }
1274    }
1275
1276    #[tokio::test]
1277    async fn read_record_no_headers_before() {
1278        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1279        let mut rdr =
1280            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1281        let mut rec = StringRecord::new();
1282
1283        {
1284            let headers = rdr.headers().await.unwrap();
1285            assert_eq!(3, headers.len());
1286            assert_eq!("foo", &headers[0]);
1287            assert_eq!("bar", &headers[1]);
1288            assert_eq!("baz", &headers[2]);
1289        }
1290
1291        assert!(rdr.read_record(&mut rec).await.unwrap());
1292        assert_eq!(3, rec.len());
1293        assert_eq!("foo", &rec[0]);
1294
1295        assert!(rdr.read_record(&mut rec).await.unwrap());
1296        assert_eq!(3, rec.len());
1297        assert_eq!("a", &rec[0]);
1298
1299        assert!(rdr.read_record(&mut rec).await.unwrap());
1300        assert_eq!(3, rec.len());
1301        assert_eq!("d", &rec[0]);
1302
1303        assert!(!rdr.read_record(&mut rec).await.unwrap());
1304    }
1305
1306    #[tokio::test]
1307    async fn read_record_no_headers_after() {
1308        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1309        let mut rdr =
1310            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1311        let mut rec = StringRecord::new();
1312
1313        assert!(rdr.read_record(&mut rec).await.unwrap());
1314        assert_eq!(3, rec.len());
1315        assert_eq!("foo", &rec[0]);
1316
1317        assert!(rdr.read_record(&mut rec).await.unwrap());
1318        assert_eq!(3, rec.len());
1319        assert_eq!("a", &rec[0]);
1320
1321        assert!(rdr.read_record(&mut rec).await.unwrap());
1322        assert_eq!(3, rec.len());
1323        assert_eq!("d", &rec[0]);
1324
1325        assert!(!rdr.read_record(&mut rec).await.unwrap());
1326
1327        let headers = rdr.headers().await.unwrap();
1328        assert_eq!(3, headers.len());
1329        assert_eq!("foo", &headers[0]);
1330        assert_eq!("bar", &headers[1]);
1331        assert_eq!("baz", &headers[2]);
1332    }
1333
1334    #[derive(Debug, Deserialize, Eq, PartialEq)]
1335    struct Row1([String; 3]);
1336
1337    // Test that position info is reported correctly in absence of headers.
1338    #[tokio::test]
1339    async fn positions_no_headers() {
1340        let mut rdr = AsyncReaderBuilder::new()
1341            .has_headers(false)
1342            .create_deserializer("a,b,c\nx,y,z".as_bytes())
1343            .into_deserialize_with_pos::<Row1>();
1344
1345        let (_, pos) = rdr.next().await.unwrap();
1346        assert_eq!(pos.byte(), 0);
1347        assert_eq!(pos.line(), 1);
1348        assert_eq!(pos.record(), 0);
1349
1350        let (_, pos) = rdr.next().await.unwrap();
1351        assert_eq!(pos.byte(), 6);
1352        assert_eq!(pos.line(), 2);
1353        assert_eq!(pos.record(), 1);
1354
1355        // Test that we are at end of stream, and properly signal this.
1356        assert!(rdr.next().await.is_none());
1357        // Testing that we are not panic, trying to pass over end of stream (Issue#22)
1358        assert!(rdr.next().await.is_none());
1359    }
1360
1361    // Test that position info is reported correctly with headers.
1362    #[tokio::test]
1363    async fn positions_headers() {
1364        let mut rdr = AsyncReaderBuilder::new()
1365            .has_headers(true)
1366            .create_deserializer("a,b,c\nx,y,z".as_bytes())
1367            .into_deserialize_with_pos::<Row1>();
1368
1369        let (_, pos) = rdr.next().await.unwrap();
1370        assert_eq!(pos.byte(), 6);
1371        assert_eq!(pos.line(), 2);
1372        // We could not count header as record, but we keep compatibility with 'csv' crate.
1373        assert_eq!(pos.record(), 1);
1374    }
1375
1376    // Test that reading headers on empty data yields an empty record.
1377    #[tokio::test]
1378    async fn headers_on_empty_data() {
1379        let mut rdr = AsyncReaderBuilder::new().create_deserializer("".as_bytes());
1380        let r = rdr.byte_headers().await.unwrap();
1381        assert_eq!(r.len(), 0);
1382    }
1383
1384    // Test that reading the first record on empty data works.
1385    #[tokio::test]
1386    async fn no_headers_on_empty_data() {
1387        let mut rdr =
1388            AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
1389        assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
1390    }
1391
1392    // Test that reading the first record on empty data works, even if
1393    // we've tried to read headers before hand.
1394    #[tokio::test]
1395    async fn no_headers_on_empty_data_after_headers() {
1396        let mut rdr =
1397            AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
1398        assert_eq!(rdr.headers().await.unwrap().len(), 0);
1399        assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
1400    }
1401
1402    #[test]
1403    fn no_infinite_loop_on_io_errors() {
1404        struct FailingRead;
1405        impl io::AsyncRead for FailingRead {
1406            fn poll_read(
1407                self: Pin<&mut Self>,
1408                _cx: &mut Context,
1409                _buf: &mut tokio::io::ReadBuf
1410            ) -> Poll<Result<(), io::Error>> {
1411                Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
1412            }
1413        }
1414        impl Unpin for FailingRead {}
1415
1416        #[derive(Deserialize)]
1417        struct Fake;
1418    
1419        Runtime::new().unwrap().block_on(async {
1420            let mut record_results = AsyncDeserializer::from_reader(FailingRead).into_deserialize::<Fake>();
1421            let first_result = record_results.next().await;
1422            assert!(
1423                matches!(&first_result, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1424            );
1425            assert!(record_results.next().await.is_none());
1426        });
1427    }
1428}