csv_async/async_readers/ades_tokio.rs
1use tokio::io;
2use serde::de::DeserializeOwned;
3
4use crate::AsyncReaderBuilder;
5use crate::byte_record::{ByteRecord, Position};
6use crate::error::Result;
7use crate::string_record::StringRecord;
8use super::{
9 AsyncReaderImpl,
10 DeserializeRecordsStream, DeserializeRecordsIntoStream,
11 DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
12};
13
14
15impl AsyncReaderBuilder {
16 /// Build a CSV `serde` deserializer from this configuration that reads data from `rdr`.
17 ///
18 /// Note that the CSV reader is buffered automatically, so you should not
19 /// wrap `rdr` in a buffered reader.
20 ///
21 /// # Example
22 ///
23 /// ```
24 /// # use tokio1 as tokio;
25 /// use std::error::Error;
26 /// use csv_async::AsyncReaderBuilder;
27 /// use serde::Deserialize;
28 /// use tokio_stream::StreamExt;
29 ///
30 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
31 /// struct Row {
32 /// city: String,
33 /// country: String,
34 /// pop: u64,
35 /// }
36 ///
37 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
38 /// async fn example() -> Result<(), Box<dyn Error>> {
39 /// let data = "\
40 /// city,country,pop
41 /// Boston,United States,4628910
42 /// Concord,United States,42695
43 /// ";
44 /// let mut rdr = AsyncReaderBuilder::new().create_deserializer(data.as_bytes());
45 /// let mut records = rdr.into_deserialize::<Row>();
46 /// while let Some(record) = records.next().await {
47 /// println!("{:?}", record?);
48 /// }
49 /// Ok(())
50 /// }
51 /// ```
52 pub fn create_deserializer<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncDeserializer<R> {
53 AsyncDeserializer::new(self, rdr)
54 }
55}
56
57/// A already configured CSV `serde` deserializer for `tokio` runtime.
58///
59/// A CSV deserializer takes as input CSV data and transforms that into standard Rust
60/// values. The reader reads CSV data is as a sequence of records,
61/// where a record is either a sequence of string fields or structure with derived
62/// `serde::Deserialize` interface.
63///
64/// # Configuration
65///
66/// A CSV deserializer has convenient constructor method `from_reader`.
67/// However, if you want to configure the CSV deserializer to use
68/// a different delimiter or quote character (among many other things), then
69/// you should use a [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html) to construct
70/// a `AsyncDeserializer`. For example, to change the field delimiter:
71///
72/// ```
73/// # use tokio1 as tokio;
74/// use std::error::Error;
75/// use csv_async::AsyncReaderBuilder;
76/// use serde::Deserialize;
77/// use tokio_stream::StreamExt;
78///
79/// #[derive(Debug, Deserialize, Eq, PartialEq)]
80/// struct Row {
81/// city: String,
82/// country: String,
83/// pop: u64,
84/// }
85///
86/// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
87/// async fn example() -> Result<(), Box<dyn Error>> {
88/// let data = indoc::indoc! {"
89/// city;country;pop
90/// Boston;United States;4628910
91/// "};
92/// let mut rdr = AsyncReaderBuilder::new()
93/// .delimiter(b';')
94/// .create_deserializer(data.as_bytes());
95///
96/// let mut records = rdr.deserialize::<Row>();
97/// assert_eq!(records.next().await.unwrap()?,
98/// Row {city: "Boston".to_string(),
99/// country: "United States".to_string(),
100/// pop: 4628910 });
101/// Ok(())
102/// }
103/// ```
104///
105/// # Error handling
106///
107/// In general, CSV *parsing* does not ever return an error. That is, there is
108/// no such thing as malformed CSV data. Instead, this reader will prioritize
109/// finding a parse over rejecting CSV data that it does not understand. This
110/// choice was inspired by other popular CSV parsers, but also because it is
111/// pragmatic. CSV data varies wildly, so even if the CSV data is malformed,
112/// it might still be possible to work with the data. In the land of CSV, there
113/// is no "right" or "wrong," only "right" and "less right."
114///
115/// With that said, a number of errors can occur while reading CSV data:
116///
117/// * By default, all records in CSV data must have the same number of fields.
118/// If a record is found with a different number of fields than a prior
119/// record, then an error is returned. This behavior can be disabled by
120/// enabling flexible parsing via the `flexible` method on
121/// [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html).
122/// * When reading CSV data from a resource (like a file), it is possible for
123/// reading from the underlying resource to fail. This will return an error.
124/// For subsequent calls to the reader after encountering a such error
125/// (unless `seek` is used), it will behave as if end of file had been
126/// reached, in order to avoid running into infinite loops when still
127/// attempting to read the next record when one has errored.
128/// * When reading CSV data into `String` or `&str` fields (e.g., via a
129/// [`StringRecord`](struct.StringRecord.html)), UTF-8 is strictly
130/// enforced. If CSV data is invalid UTF-8, then an error is returned. If
131/// you want to read invalid UTF-8, then you should use the byte oriented
132/// APIs such as [`ByteRecord`](struct.ByteRecord.html). If you need explicit
133/// support for another encoding entirely, then you'll need to use another
134/// crate to transcode your CSV data to UTF-8 before parsing it.
135/// * When using Serde to deserialize CSV data into Rust types, it is possible
136/// for a number of additional errors to occur. For example, deserializing
137/// a field `xyz` into an `i32` field will result in an error.
138///
139/// For more details on the precise semantics of errors, see the
140/// [`Error`](enum.Error.html) type.
141#[derive(Debug)]
142pub struct AsyncDeserializer<R>(AsyncReaderImpl<R>);
143
144impl<'r, R> AsyncDeserializer<R>
145where
146 R: io::AsyncRead + Unpin + Send + 'r,
147{
148 /// Create a new CSV reader given a builder and a source of underlying
149 /// bytes.
150 fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncDeserializer<R> {
151 AsyncDeserializer(AsyncReaderImpl::new(builder, rdr))
152 }
153
154 /// Create a new CSV parser with a default configuration for the given
155 /// reader.
156 ///
157 /// To customize CSV parsing, use a `ReaderBuilder`.
158 ///
159 /// # Example
160 ///
161 /// ```
162 /// # use tokio1 as tokio;
163 /// use std::error::Error;
164 /// use csv_async::AsyncDeserializer;
165 /// use serde::Deserialize;
166 /// use tokio_stream::StreamExt;
167 ///
168 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
169 /// struct Row {
170 /// city: String,
171 /// country: String,
172 /// pop: u64,
173 /// }
174 ///
175 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
176 /// async fn example() -> Result<(), Box<dyn Error>> {
177 /// let data = "\
178 /// city,country,pop
179 /// Boston,United States,4628910
180 /// Concord,United States,42695
181 /// ";
182 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
183 /// let mut records = rdr.into_deserialize::<Row>();
184 /// while let Some(record) = records.next().await {
185 /// println!("{:?}", record?);
186 /// }
187 /// Ok(())
188 /// }
189 /// ```
190 #[inline]
191 pub fn from_reader(rdr: R) -> AsyncDeserializer<R> {
192 AsyncReaderBuilder::new().create_deserializer(rdr)
193 }
194
195 /// Returns a borrowed stream over deserialized records.
196 ///
197 /// Each item yielded by this stream is a `Result<D, Error>`.
198 /// Therefore, in order to access the record, callers must handle the
199 /// possibility of error (typically with `?`).
200 ///
201 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
202 /// default), then this does not include the first record. Additionally,
203 /// if `has_headers` is enabled, then deserializing into a struct will
204 /// automatically align the values in each row to the fields of a struct
205 /// based on the header row.
206 ///
207 /// Frequently turbo-fish notation is needed while calling this function:
208 /// `rdr.deserialize::<RecordType>();`
209 ///
210 /// # Example
211 ///
212 /// This shows how to deserialize CSV data into normal Rust structures. The
213 /// fields of the header row are used to match up the values in each row
214 /// to the fields of the struct.
215 ///
216 /// ```
217 /// # use tokio1 as tokio;
218 /// use std::error::Error;
219 ///
220 /// use csv_async::AsyncDeserializer;
221 /// use serde::Deserialize;
222 /// use tokio_stream::StreamExt;
223 ///
224 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
225 /// struct Row {
226 /// city: String,
227 /// country: String,
228 /// #[serde(rename = "popcount")]
229 /// population: u64,
230 /// }
231 ///
232 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
233 /// async fn example() -> Result<(), Box<dyn Error>> {
234 /// let data = "\
235 /// city,country,popcount
236 /// Boston,United States,4628910
237 /// ";
238 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
239 /// let mut iter = rdr.deserialize();
240 ///
241 /// if let Some(result) = iter.next().await {
242 /// let record: Row = result?;
243 /// assert_eq!(record, Row {
244 /// city: "Boston".to_string(),
245 /// country: "United States".to_string(),
246 /// population: 4628910,
247 /// });
248 /// Ok(())
249 /// } else {
250 /// Err(From::from("expected at least one record but got none"))
251 /// }
252 /// }
253 /// ```
254 ///
255 /// # Rules
256 ///
257 /// For the most part, any Rust type that maps straight-forwardly to a CSV
258 /// record is supported. This includes maps, structs, tuples and tuple
259 /// structs. Other Rust types, such as `Vec`s, arrays, and enums have
260 /// a more complicated story. In general, when working with CSV data, one
261 /// should avoid *nested sequences* as much as possible.
262 ///
263 /// Maps, structs, tuples and tuple structs map to CSV records in a simple
264 /// way. Tuples and tuple structs decode their fields in the order that
265 /// they are defined. Structs will do the same only if `has_headers` has
266 /// been disabled using [`ReaderBuilder`](struct.ReaderBuilder.html),
267 /// otherwise, structs and maps are deserialized based on the fields
268 /// defined in the header row. (If there is no header row, then
269 /// deserializing into a map will result in an error.)
270 ///
271 /// Nested sequences are supported in a limited capacity. Namely, they
272 /// are flattened. As a result, it's often useful to use a `Vec` to capture
273 /// a "tail" of fields in a record:
274 ///
275 /// ```
276 /// # use tokio1 as tokio;
277 /// use std::error::Error;
278 ///
279 /// use csv_async::AsyncReaderBuilder;
280 /// use serde::Deserialize;
281 /// use tokio_stream::StreamExt;
282 ///
283 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
284 /// struct Row {
285 /// label: String,
286 /// values: Vec<i32>,
287 /// }
288 ///
289 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
290 /// async fn example() -> Result<(), Box<dyn Error>> {
291 /// let data = "foo,1,2,3";
292 /// let mut rdr = AsyncReaderBuilder::new()
293 /// .has_headers(false)
294 /// .create_deserializer(data.as_bytes());
295 /// let mut iter = rdr.deserialize();
296 ///
297 /// if let Some(result) = iter.next().await {
298 /// let record: Row = result?;
299 /// assert_eq!(record, Row {
300 /// label: "foo".to_string(),
301 /// values: vec![1, 2, 3],
302 /// });
303 /// Ok(())
304 /// } else {
305 /// Err(From::from("expected at least one record but got none"))
306 /// }
307 /// }
308 /// ```
309 ///
310 /// In the above example, adding another field to the `Row` struct after
311 /// the `values` field will result in a deserialization error. This is
312 /// because the deserializer doesn't know when to stop reading fields
313 /// into the `values` vector, so it will consume the rest of the fields in
314 /// the record leaving none left over for the additional field.
315 ///
316 /// Finally, simple enums in Rust can be deserialized as well. Namely,
317 /// enums must either be variants with no arguments or variants with a
318 /// single argument. Variants with no arguments are deserialized based on
319 /// which variant name the field matches. Variants with a single argument
320 /// are deserialized based on which variant can store the data. The latter
321 /// is only supported when using "untagged" enum deserialization. The
322 /// following example shows both forms in action:
323 ///
324 /// ```
325 /// # use tokio1 as tokio;
326 /// use std::error::Error;
327 ///
328 /// use csv_async::AsyncDeserializer;
329 /// use serde::Deserialize;
330 /// use tokio_stream::StreamExt;
331 ///
332 /// #[derive(Debug, Deserialize, PartialEq)]
333 /// struct Row {
334 /// label: Label,
335 /// value: Number,
336 /// }
337 ///
338 /// #[derive(Debug, Deserialize, PartialEq)]
339 /// #[serde(rename_all = "lowercase")]
340 /// enum Label {
341 /// Celsius,
342 /// Fahrenheit,
343 /// }
344 ///
345 /// #[derive(Debug, Deserialize, PartialEq)]
346 /// #[serde(untagged)]
347 /// enum Number {
348 /// Integer(i64),
349 /// Float(f64),
350 /// }
351 ///
352 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
353 /// async fn example() -> Result<(), Box<dyn Error>> {
354 /// let data = "\
355 /// label,value
356 /// celsius,22.2222
357 /// fahrenheit,72
358 /// ";
359 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
360 /// let mut iter = rdr.deserialize();
361 ///
362 /// // Read the first record.
363 /// if let Some(result) = iter.next().await {
364 /// let record: Row = result?;
365 /// assert_eq!(record, Row {
366 /// label: Label::Celsius,
367 /// value: Number::Float(22.2222),
368 /// });
369 /// } else {
370 /// return Err(From::from(
371 /// "expected at least two records but got none"));
372 /// }
373 ///
374 /// // Read the second record.
375 /// if let Some(result) = iter.next().await {
376 /// let record: Row = result?;
377 /// assert_eq!(record, Row {
378 /// label: Label::Fahrenheit,
379 /// value: Number::Integer(72),
380 /// });
381 /// Ok(())
382 /// } else {
383 /// Err(From::from(
384 /// "expected at least two records but got only one"))
385 /// }
386 /// }
387 /// ```
388 #[inline]
389 pub fn deserialize<D:'r>(&'r mut self) -> DeserializeRecordsStream<'r, R, D>
390 where
391 D: DeserializeOwned,
392 {
393 DeserializeRecordsStream::new(& mut self.0)
394 }
395
396 /// Returns a borrowed stream over pairs of deserialized record and position
397 /// in reader stream before record read.
398 ///
399 /// Each item yielded by this stream is a `(Result<D, Error>, Position)`.
400 /// Therefore, in order to access the record, callers must handle the
401 /// possibility of error (typically with `?`).
402 ///
403 /// Frequently turbo-fish notation is needed while calling this function:
404 /// `rdr.deserialize_with_pos::<RecordType>();`
405 ///
406 /// # Example
407 ///
408 /// This shows how to deserialize CSV data into normal Rust structures. The
409 /// fields of the header row are used to match up the values in each row
410 /// to the fields of the struct.
411 ///
412 /// ```
413 /// # use tokio1 as tokio;
414 /// use std::error::Error;
415 ///
416 /// use csv_async::AsyncDeserializer;
417 /// use serde::Deserialize;
418 /// use tokio_stream::StreamExt;
419 ///
420 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
421 /// struct Row {
422 /// city: String,
423 /// country: String,
424 /// population: u64,
425 /// }
426 ///
427 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
428 /// async fn example() -> Result<(), Box<dyn Error>> {
429 /// let data = "\
430 /// city,country,population
431 /// Boston,United States,4628910
432 /// Concord,United States,42695
433 /// ";
434 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
435 /// let mut iter = rdr.deserialize_with_pos();
436 ///
437 /// if let Some((result, pos)) = iter.next().await {
438 /// let record: Row = result?;
439 /// assert_eq!(record, Row {
440 /// city: "Boston".to_string(),
441 /// country: "United States".to_string(),
442 /// population: 4628910,
443 /// });
444 /// assert_eq!(pos.byte(), 24);
445 /// assert_eq!(pos.line(), 2);
446 /// assert_eq!(pos.record(), 1);
447 /// } else {
448 /// return Err(From::from("expected at least one record but got none"));
449 /// }
450 /// if let Some((result, pos)) = iter.next().await {
451 /// let record: Row = result?;
452 /// assert_eq!(record, Row {
453 /// city: "Concord".to_string(),
454 /// country: "United States".to_string(),
455 /// population: 42695,
456 /// });
457 /// assert_eq!(pos.byte(), 53);
458 /// assert_eq!(pos.line(), 3);
459 /// assert_eq!(pos.record(), 2);
460 /// } else {
461 /// return Err(From::from("expected at least two records but got one only"));
462 /// }
463 /// assert!(iter.next().await.is_none());
464 /// Ok(())
465 /// }
466 /// ```
467 #[inline]
468 pub fn deserialize_with_pos<D:'r>(&'r mut self) -> DeserializeRecordsStreamPos<'r, R, D>
469 where
470 D: DeserializeOwned,
471 {
472 DeserializeRecordsStreamPos::new(& mut self.0)
473 }
474
475 /// Returns a owned stream over deserialized records.
476 ///
477 /// Each item yielded by this stream is a `Result<D, Error>`.
478 /// Therefore, in order to access the record, callers must handle the
479 /// possibility of error (typically with `?`).
480 ///
481 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
482 /// default), then this does not include the first record. Additionally,
483 /// if `has_headers` is enabled, then deserializing into a struct will
484 /// automatically align the values in each row to the fields of a struct
485 /// based on the header row.
486 ///
487 /// Frequently turbo-fish notation is needed while calling this function:
488 /// `rdr.into_deserialize::<RecordType>();`
489 ///
490 /// # Example
491 ///
492 /// This shows how to deserialize CSV data into normal Rust structs. The
493 /// fields of the header row are used to match up the values in each row
494 /// to the fields of the struct.
495 ///
496 /// ```
497 /// # use tokio1 as tokio;
498 /// use std::error::Error;
499 ///
500 /// use csv_async::AsyncDeserializer;
501 /// use serde::Deserialize;
502 /// use tokio_stream::StreamExt;
503 ///
504 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
505 /// struct Row {
506 /// city: String,
507 /// country: String,
508 /// #[serde(rename = "popcount")]
509 /// population: u64,
510 /// }
511 ///
512 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
513 /// async fn example() -> Result<(), Box<dyn Error>> {
514 /// let data = "\
515 /// city,country,popcount
516 /// Boston,United States,4628910
517 /// ";
518 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
519 /// let mut iter = rdr.into_deserialize();
520 ///
521 /// if let Some(result) = iter.next().await {
522 /// let record: Row = result?;
523 /// assert_eq!(record, Row {
524 /// city: "Boston".to_string(),
525 /// country: "United States".to_string(),
526 /// population: 4628910,
527 /// });
528 /// Ok(())
529 /// } else {
530 /// Err(From::from("expected at least one record but got none"))
531 /// }
532 /// }
533 /// ```
534 #[inline]
535 pub fn into_deserialize<D:'r>(self) -> DeserializeRecordsIntoStream<'r, R, D>
536 where
537 D: DeserializeOwned,
538 {
539 DeserializeRecordsIntoStream::new(self.0)
540 }
541
542 /// Returns a owned stream over pairs of deserialized record and position
543 /// in reader stream before record read.
544 ///
545 #[inline]
546 pub fn into_deserialize_with_pos<D:'r>(self) -> DeserializeRecordsIntoStreamPos<'r, R, D>
547 where
548 D: DeserializeOwned,
549 {
550 DeserializeRecordsIntoStreamPos::new(self.0)
551 }
552
553 /// Returns a reference to the first row read by this parser.
554 ///
555 /// If no row has been read yet, then this will force parsing of the first
556 /// row.
557 ///
558 /// If there was a problem parsing the row or if it wasn't valid UTF-8,
559 /// then this returns an error.
560 ///
561 /// If the underlying reader emits EOF before any data, then this returns
562 /// an empty record.
563 ///
564 /// Note that this method may be used regardless of whether `has_headers`
565 /// was enabled (but it is enabled by default).
566 ///
567 /// # Example
568 ///
569 /// This example shows how to get the header row of CSV data. Notice that
570 /// the header row does not appear as a record in the iterator!
571 ///
572 /// ```
573 /// # use tokio1 as tokio;
574 /// use std::error::Error;
575 /// use csv_async::AsyncDeserializer;
576 /// use serde::Deserialize;
577 /// use tokio_stream::StreamExt;
578 ///
579 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
580 /// struct Row {
581 /// city: String,
582 /// country: String,
583 /// pop: u64,
584 /// }
585 ///
586 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
587 /// async fn example() -> Result<(), Box<dyn Error>> {
588 /// let data = "\
589 /// city,country,pop
590 /// Boston,United States,4628910
591 /// ";
592 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
593 ///
594 /// // We can read the headers before iterating.
595 /// {
596 /// // `headers` borrows from the reader, so we put this in its
597 /// // own scope. That way, the borrow ends before we try iterating
598 /// // below. Alternatively, we could clone the headers.
599 /// let headers = rdr.headers().await?;
600 /// assert_eq!(headers, vec!["city", "country", "pop"]);
601 /// }
602 ///
603 /// {
604 /// let mut records = rdr.deserialize::<Row>();
605 /// assert_eq!(records.next().await.unwrap()?,
606 /// Row {city: "Boston".to_string(),
607 /// country: "United States".to_string(),
608 /// pop: 4628910 });
609 /// assert!(records.next().await.is_none());
610 /// }
611 ///
612 /// // We can also read the headers after iterating.
613 /// let headers = rdr.headers().await?;
614 /// assert_eq!(headers, vec!["city", "country", "pop"]);
615 /// Ok(())
616 /// }
617 /// ```
618 #[inline]
619 pub async fn headers(&mut self) -> Result<&StringRecord> {
620 self.0.headers().await
621 }
622
623 /// Returns a reference to the first row read by this parser as raw bytes.
624 ///
625 /// If no row has been read yet, then this will force parsing of the first
626 /// row.
627 ///
628 /// If there was a problem parsing the row then this returns an error.
629 ///
630 /// If the underlying reader emits EOF before any data, then this returns
631 /// an empty record.
632 ///
633 /// Note that this method may be used regardless of whether `has_headers`
634 /// was enabled (but it is enabled by default).
635 ///
636 /// # Example
637 ///
638 /// This example shows how to get the header row of CSV data. Notice that
639 /// the header row does not appear as a record in the iterator!
640 ///
641 /// ```
642 /// # use tokio1 as tokio;
643 /// use std::error::Error;
644 /// use csv_async::AsyncDeserializer;
645 /// use serde::Deserialize;
646 /// use tokio_stream::StreamExt;
647 ///
648 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
649 /// struct Row {
650 /// city: String,
651 /// country: String,
652 /// #[serde(rename = "pop")]
653 /// population: u64,
654 /// }
655 ///
656 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
657 /// async fn example() -> Result<(), Box<dyn Error>> {
658 /// let data = indoc::indoc! {"
659 /// city,country,pop
660 /// Boston,United States,4628910
661 /// "};
662 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
663 ///
664 /// // We can read the headers before iterating.
665 /// {
666 /// // `headers` borrows from the reader, so we put this in its
667 /// // own scope. That way, the borrow ends before we try iterating
668 /// // below. Alternatively, we could clone the headers.
669 /// let headers = rdr.byte_headers().await?;
670 /// assert_eq!(headers, vec!["city", "country", "pop"]);
671 /// }
672 ///
673 /// {
674 /// let mut records = rdr.deserialize::<Row>();
675 /// assert_eq!(records.next().await.unwrap()?,
676 /// Row {city: "Boston".to_string(),
677 /// country: "United States".to_string(),
678 /// population: 4628910 });
679 /// assert!(records.next().await.is_none());
680 /// }
681 ///
682 /// // We can also read the headers after iterating.
683 /// let headers = rdr.byte_headers().await?;
684 /// assert_eq!(headers, vec!["city", "country", "pop"]);
685 /// Ok(())
686 /// }
687 /// ```
688 #[inline]
689 pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
690 self.0.byte_headers().await
691 }
692
693 /// Set the headers of this CSV parser manually.
694 ///
695 /// This overrides any other setting (including `set_byte_headers`). Any
696 /// automatic detection of headers is disabled. This may be called at any
697 /// time.
698 ///
699 /// # Example
700 ///
701 /// ```
702 /// use std::error::Error;
703 /// use csv_async::{AsyncDeserializer, StringRecord};
704 ///
705 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
706 /// async fn example() -> Result<(), Box<dyn Error>> {
707 /// let data = "\
708 /// city,country,pop
709 /// Boston,United States,4628910
710 /// ";
711 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
712 ///
713 /// assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
714 /// rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
715 /// assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
716 ///
717 /// Ok(())
718 /// }
719 /// ```
720 #[inline]
721 pub fn set_headers(&mut self, headers: StringRecord) {
722 self.0.set_headers(headers);
723 }
724
725 /// Set the headers of this CSV parser manually as raw bytes.
726 ///
727 /// This overrides any other setting (including `set_headers`). Any
728 /// automatic detection of headers is disabled. This may be called at any
729 /// time.
730 ///
731 /// # Example
732 ///
733 /// ```
734 /// use std::error::Error;
735 /// use csv_async::{AsyncDeserializer, ByteRecord};
736 ///
737 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
738 /// async fn example() -> Result<(), Box<dyn Error>> {
739 /// let data = "\
740 /// city,country,pop
741 /// Boston,United States,4628910
742 /// ";
743 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
744 ///
745 /// assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
746 /// rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
747 /// assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
748 ///
749 /// Ok(())
750 /// }
751 /// ```
752 #[inline]
753 pub fn set_byte_headers(&mut self, headers: ByteRecord) {
754 self.0.set_byte_headers(headers);
755 }
756
757 /// Read a single row into the given record. Returns false when no more
758 /// records could be read.
759 ///
760 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
761 /// default), then this will never read the first record.
762 ///
763 /// This method is useful when you want to read records as fast as
764 /// as possible. It's less ergonomic than an iterator, but it permits the
765 /// caller to reuse the `StringRecord` allocation, which usually results
766 /// in higher throughput.
767 ///
768 /// Records read via this method are guaranteed to have a position set
769 /// on them, even if the reader is at EOF or if an error is returned.
770 ///
771 /// # Example
772 ///
773 /// ```
774 /// use std::error::Error;
775 /// use csv_async::{AsyncDeserializer, StringRecord};
776 ///
777 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
778 /// async fn example() -> Result<(), Box<dyn Error>> {
779 /// let data = "\
780 /// city,country,pop
781 /// Boston,United States,4628910
782 /// ";
783 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
784 /// let mut record = StringRecord::new();
785 ///
786 /// if rdr.read_record(&mut record).await? {
787 /// assert_eq!(record, vec!["Boston", "United States", "4628910"]);
788 /// Ok(())
789 /// } else {
790 /// Err(From::from("expected at least one record but got none"))
791 /// }
792 /// }
793 /// ```
794 #[inline]
795 pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
796 self.0.read_record(record).await
797 }
798
799 /// Read a single row into the given byte record. Returns false when no
800 /// more records could be read.
801 ///
802 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
803 /// default), then this will never read the first record.
804 ///
805 /// This method is useful when you want to read records as fast as
806 /// as possible. It's less ergonomic than an iterator, but it permits the
807 /// caller to reuse the `ByteRecord` allocation, which usually results
808 /// in higher throughput.
809 ///
810 /// Records read via this method are guaranteed to have a position set
811 /// on them, even if the reader is at EOF or if an error is returned.
812 ///
813 /// # Example
814 ///
815 /// ```
816 /// use std::error::Error;
817 /// use csv_async::{ByteRecord, AsyncDeserializer};
818 ///
819 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
820 /// async fn example() -> Result<(), Box<dyn Error>> {
821 /// let data = "\
822 /// city,country,pop
823 /// Boston,United States,4628910
824 /// ";
825 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
826 /// let mut record = ByteRecord::new();
827 ///
828 /// if rdr.read_byte_record(&mut record).await? {
829 /// assert_eq!(record, vec!["Boston", "United States", "4628910"]);
830 /// Ok(())
831 /// } else {
832 /// Err(From::from("expected at least one record but got none"))
833 /// }
834 /// }
835 /// ```
836 #[inline]
837 pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
838 self.0.read_byte_record(record).await
839 }
840
841 /// Return the current position of this CSV deserializer.
842 ///
843 /// Because of borrowing rules this function can only be used when there is no
844 /// alive deserializer (which borrows mutable reader).
845 /// To know position during deserialization, `deserialize_with_pos` should be
846 /// used as shown in below example.
847 ///
848 /// The byte offset in the position returned can be used to `seek` this
849 /// deserializer. In particular, seeking to a position returned here on the same
850 /// data will result in parsing the same subsequent record.
851 ///
852 /// # Example: reading the position
853 ///
854 /// ```
855 /// # use tokio1 as tokio;
856 /// use std::error::Error;
857 /// use std::io;
858 /// use csv_async::{AsyncDeserializer, Position};
859 /// use serde::Deserialize;
860 /// use tokio_stream::StreamExt;
861 ///
862 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
863 /// struct Row {
864 /// city: String,
865 /// country: String,
866 /// popcount: u64,
867 /// }
868 ///
869 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
870 /// async fn example() -> Result<(), Box<dyn Error>> {
871 /// let data = "\
872 /// city,country,popcount
873 /// Boston,United States,4628910
874 /// Concord,United States,42695
875 /// ";
876 /// let mut rdr = AsyncDeserializer::from_reader(io::Cursor::new(data));
877 /// let mut iter = rdr.deserialize_with_pos::<Row>();
878 /// let mut pos_at_boston = Position::new();
879 /// while let Some((rec, pos)) = iter.next().await {
880 /// if rec?.city == "Boston" {
881 /// pos_at_boston = pos;
882 /// }
883 /// }
884 /// drop(iter); // releases rdr borrow by iter
885 /// let pos_at_end = rdr.position();
886 ///
887 /// assert_eq!(pos_at_boston.byte(), 22);
888 /// assert_eq!(pos_at_boston.line(), 2);
889 /// assert_eq!(pos_at_boston.record(), 1);
890 /// assert_eq!(pos_at_end.byte(), 79);
891 /// assert_eq!(pos_at_end.line(), 4);
892 /// assert_eq!(pos_at_end.record(), 3);
893 /// Ok(())
894 /// }
895 /// ```
896 #[inline]
897 pub fn position(&self) -> &Position {
898 self.0.position()
899 }
900
901 /// Returns true if and only if this reader has been exhausted.
902 ///
903 /// When this returns true, no more records can be read from this reader.
904 ///
905 /// # Example
906 ///
907 /// ```
908 /// # use tokio1 as tokio;
909 /// use std::error::Error;
910 /// use csv_async::{AsyncDeserializer, Position};
911 /// use serde::Deserialize;
912 /// use tokio_stream::StreamExt;
913 ///
914 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
915 /// struct Row {
916 /// city: String,
917 /// country: String,
918 /// popcount: u64,
919 /// }
920 ///
921 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
922 /// async fn example() -> Result<(), Box<dyn Error>> {
923 /// let data = "\
924 /// city,country,popcount
925 /// Boston,United States,4628910
926 /// Concord,United States,42695
927 /// ";
928 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
929 /// assert!(!rdr.is_done());
930 /// {
931 /// let mut records = rdr.deserialize::<Row>();
932 /// while let Some(record) = records.next().await {
933 /// let _ = record?;
934 /// }
935 /// }
936 /// assert!(rdr.is_done());
937 /// Ok(())
938 /// }
939 /// ```
940 #[inline]
941 pub fn is_done(&self) -> bool {
942 self.0.is_done()
943 }
944
945 /// Returns true if and only if this reader has been configured to
946 /// interpret the first record as a header record.
947 #[inline]
948 pub fn has_headers(&self) -> bool {
949 self.0.has_headers()
950 }
951
952 /// Returns a reference to the underlying reader.
953 #[inline]
954 pub fn get_ref(&self) -> &R {
955 self.0.get_ref()
956 }
957
958 /// Returns a mutable reference to the underlying reader.
959 #[inline]
960 pub fn get_mut(&mut self) -> &mut R {
961 self.0.get_mut()
962 }
963
964 /// Unwraps this CSV reader, returning the underlying reader.
965 ///
966 /// Note that any leftover data inside this reader's internal buffer is
967 /// lost.
968 #[inline]
969 pub fn into_inner(self) -> R {
970 self.0.into_inner()
971 }
972}
973
974
975#[cfg(test)]
976mod tests {
977 use std::pin::Pin;
978 use std::task::{Context, Poll};
979
980 use tokio::io;
981 use tokio_stream::StreamExt;
982 use serde::Deserialize;
983 use tokio::runtime::Runtime;
984
985 use crate::byte_record::ByteRecord;
986 use crate::error::ErrorKind;
987 use crate::string_record::StringRecord;
988 use crate::Trim;
989
990 use super::{Position, AsyncReaderBuilder, AsyncDeserializer};
991
992 fn b(s: &str) -> &[u8] {
993 s.as_bytes()
994 }
995 fn s(b: &[u8]) -> &str {
996 ::std::str::from_utf8(b).unwrap()
997 }
998
999 fn newpos(byte: u64, line: u64, record: u64) -> Position {
1000 let mut p = Position::new();
1001 p.set_byte(byte).set_line(line).set_record(record);
1002 p
1003 }
1004
1005 async fn count(stream: impl StreamExt) -> usize {
1006 stream.fold(0, |acc, _| acc + 1 ).await
1007 }
1008
1009 #[tokio::test]
1010 async fn read_byte_record() {
1011 let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
1012 let mut rdr =
1013 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1014 let mut rec = ByteRecord::new();
1015
1016 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1017 assert_eq!(3, rec.len());
1018 assert_eq!("foo", s(&rec[0]));
1019 assert_eq!("b,ar", s(&rec[1]));
1020 assert_eq!("baz", s(&rec[2]));
1021
1022 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1023 assert_eq!(3, rec.len());
1024 assert_eq!("abc", s(&rec[0]));
1025 assert_eq!("mno", s(&rec[1]));
1026 assert_eq!("xyz", s(&rec[2]));
1027
1028 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1029 }
1030
1031 #[tokio::test]
1032 async fn read_trimmed_records_and_headers() {
1033 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
1034 let mut rdr = AsyncReaderBuilder::new()
1035 .has_headers(true)
1036 .trim(Trim::All)
1037 .create_deserializer(data);
1038 let mut rec = ByteRecord::new();
1039 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1040 assert_eq!("1", s(&rec[0]));
1041 assert_eq!("2", s(&rec[1]));
1042 assert_eq!("3", s(&rec[2]));
1043 let mut rec = StringRecord::new();
1044 assert!(rdr.read_record(&mut rec).await.unwrap());
1045 assert_eq!("1", &rec[0]);
1046 assert_eq!("", &rec[1]);
1047 assert_eq!("3", &rec[2]);
1048 {
1049 let headers = rdr.headers().await.unwrap();
1050 assert_eq!(3, headers.len());
1051 assert_eq!("foo", &headers[0]);
1052 assert_eq!("bar", &headers[1]);
1053 assert_eq!("baz", &headers[2]);
1054 }
1055 }
1056
1057 #[tokio::test]
1058 async fn read_trimmed_header() {
1059 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
1060 let mut rdr = AsyncReaderBuilder::new()
1061 .has_headers(true)
1062 .trim(Trim::Headers)
1063 .create_deserializer(data);
1064 let mut rec = ByteRecord::new();
1065 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1066 assert_eq!(" 1", s(&rec[0]));
1067 assert_eq!(" 2", s(&rec[1]));
1068 assert_eq!(" 3", s(&rec[2]));
1069 {
1070 let headers = rdr.headers().await.unwrap();
1071 assert_eq!(3, headers.len());
1072 assert_eq!("foo", &headers[0]);
1073 assert_eq!("bar", &headers[1]);
1074 assert_eq!("baz", &headers[2]);
1075 }
1076 }
1077
1078 #[tokio::test]
1079 async fn read_trimed_header_invalid_utf8() {
1080 let data = &b"foo, b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
1081 let mut rdr = AsyncReaderBuilder::new()
1082 .has_headers(true)
1083 .trim(Trim::Headers)
1084 .create_deserializer(data);
1085 let mut rec = StringRecord::new();
1086
1087 // force the headers to be read
1088 let _ = rdr.read_record(&mut rec).await;
1089 // Check the byte headers are trimmed
1090 {
1091 let headers = rdr.byte_headers().await.unwrap();
1092 assert_eq!(3, headers.len());
1093 assert_eq!(b"foo", &headers[0]);
1094 assert_eq!(b"b\xFFar", &headers[1]);
1095 assert_eq!(b"baz", &headers[2]);
1096 }
1097 match *rdr.headers().await.unwrap_err().kind() {
1098 ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1099 assert_eq!(pos, &newpos(0, 1, 0));
1100 assert_eq!(err.field(), 1);
1101 assert_eq!(err.valid_up_to(), 3);
1102 }
1103 ref err => panic!("match failed, got {:?}", err),
1104 }
1105 }
1106
1107 #[tokio::test]
1108 async fn read_trimmed_records() {
1109 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
1110 let mut rdr = AsyncReaderBuilder::new()
1111 .has_headers(true)
1112 .trim(Trim::Fields)
1113 .create_deserializer(data);
1114 let mut rec = ByteRecord::new();
1115 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1116 assert_eq!("1", s(&rec[0]));
1117 assert_eq!("2", s(&rec[1]));
1118 assert_eq!("3", s(&rec[2]));
1119 {
1120 let headers = rdr.headers().await.unwrap();
1121 assert_eq!(3, headers.len());
1122 assert_eq!("foo", &headers[0]);
1123 assert_eq!(" bar", &headers[1]);
1124 assert_eq!("\tbaz", &headers[2]);
1125 }
1126 }
1127
1128 #[tokio::test]
1129 async fn read_record_unequal_fails() {
1130 let data = b("foo\nbar,baz");
1131 let mut rdr =
1132 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1133 let mut rec = ByteRecord::new();
1134
1135 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1136 assert_eq!(1, rec.len());
1137 assert_eq!("foo", s(&rec[0]));
1138
1139 match rdr.read_byte_record(&mut rec).await {
1140 Err(err) => match *err.kind() {
1141 ErrorKind::UnequalLengths {
1142 expected_len: 1,
1143 ref pos,
1144 len: 2,
1145 } => {
1146 assert_eq!(pos, &Some(newpos(4, 2, 1)));
1147 }
1148 ref wrong => panic!("match failed, got {:?}", wrong),
1149 },
1150 wrong => panic!("match failed, got {:?}", wrong),
1151 }
1152 }
1153
1154 #[tokio::test]
1155 async fn read_record_unequal_ok() {
1156 let data = b("foo\nbar,baz");
1157 let mut rdr = AsyncReaderBuilder::new()
1158 .has_headers(false)
1159 .flexible(true)
1160 .create_deserializer(data);
1161 let mut rec = ByteRecord::new();
1162
1163 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1164 assert_eq!(1, rec.len());
1165 assert_eq!("foo", s(&rec[0]));
1166
1167 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1168 assert_eq!(2, rec.len());
1169 assert_eq!("bar", s(&rec[0]));
1170 assert_eq!("baz", s(&rec[1]));
1171
1172 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1173 }
1174
1175 // This tests that even if we get a CSV error, we can continue reading
1176 // if we want.
1177 #[tokio::test]
1178 async fn read_record_unequal_continue() {
1179 let data = b("foo\nbar,baz\nquux");
1180 let mut rdr =
1181 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1182 let mut rec = ByteRecord::new();
1183
1184 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1185 assert_eq!(1, rec.len());
1186 assert_eq!("foo", s(&rec[0]));
1187
1188 match rdr.read_byte_record(&mut rec).await {
1189 Err(err) => match err.kind() {
1190 &ErrorKind::UnequalLengths {
1191 expected_len: 1,
1192 ref pos,
1193 len: 2,
1194 } => {
1195 assert_eq!(pos, &Some(newpos(4, 2, 1)));
1196 }
1197 wrong => panic!("match failed, got {:?}", wrong),
1198 },
1199 wrong => panic!("match failed, got {:?}", wrong),
1200 }
1201
1202 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1203 assert_eq!(1, rec.len());
1204 assert_eq!("quux", s(&rec[0]));
1205
1206 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1207 }
1208
1209 #[tokio::test]
1210 async fn read_record_headers() {
1211 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1212 let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
1213 let mut rec = StringRecord::new();
1214
1215 assert!(rdr.read_record(&mut rec).await.unwrap());
1216 assert_eq!(3, rec.len());
1217 assert_eq!("a", &rec[0]);
1218
1219 assert!(rdr.read_record(&mut rec).await.unwrap());
1220 assert_eq!(3, rec.len());
1221 assert_eq!("d", &rec[0]);
1222
1223 assert!(!rdr.read_record(&mut rec).await.unwrap());
1224
1225 {
1226 let headers = rdr.byte_headers().await.unwrap();
1227 assert_eq!(3, headers.len());
1228 assert_eq!(b"foo", &headers[0]);
1229 assert_eq!(b"bar", &headers[1]);
1230 assert_eq!(b"baz", &headers[2]);
1231 }
1232 {
1233 let headers = rdr.headers().await.unwrap();
1234 assert_eq!(3, headers.len());
1235 assert_eq!("foo", &headers[0]);
1236 assert_eq!("bar", &headers[1]);
1237 assert_eq!("baz", &headers[2]);
1238 }
1239 }
1240
1241 #[tokio::test]
1242 async fn read_record_headers_invalid_utf8() {
1243 let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
1244 let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
1245 let mut rec = StringRecord::new();
1246
1247 assert!(rdr.read_record(&mut rec).await.unwrap());
1248 assert_eq!(3, rec.len());
1249 assert_eq!("a", &rec[0]);
1250
1251 assert!(rdr.read_record(&mut rec).await.unwrap());
1252 assert_eq!(3, rec.len());
1253 assert_eq!("d", &rec[0]);
1254
1255 assert!(!rdr.read_record(&mut rec).await.unwrap());
1256
1257 // Check that we can read the headers as raw bytes, but that
1258 // if we read them as strings, we get an appropriate UTF-8 error.
1259 {
1260 let headers = rdr.byte_headers().await.unwrap();
1261 assert_eq!(3, headers.len());
1262 assert_eq!(b"foo", &headers[0]);
1263 assert_eq!(b"b\xFFar", &headers[1]);
1264 assert_eq!(b"baz", &headers[2]);
1265 }
1266 match *rdr.headers().await.unwrap_err().kind() {
1267 ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1268 assert_eq!(pos, &newpos(0, 1, 0));
1269 assert_eq!(err.field(), 1);
1270 assert_eq!(err.valid_up_to(), 1);
1271 }
1272 ref err => panic!("match failed, got {:?}", err),
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn read_record_no_headers_before() {
1278 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1279 let mut rdr =
1280 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1281 let mut rec = StringRecord::new();
1282
1283 {
1284 let headers = rdr.headers().await.unwrap();
1285 assert_eq!(3, headers.len());
1286 assert_eq!("foo", &headers[0]);
1287 assert_eq!("bar", &headers[1]);
1288 assert_eq!("baz", &headers[2]);
1289 }
1290
1291 assert!(rdr.read_record(&mut rec).await.unwrap());
1292 assert_eq!(3, rec.len());
1293 assert_eq!("foo", &rec[0]);
1294
1295 assert!(rdr.read_record(&mut rec).await.unwrap());
1296 assert_eq!(3, rec.len());
1297 assert_eq!("a", &rec[0]);
1298
1299 assert!(rdr.read_record(&mut rec).await.unwrap());
1300 assert_eq!(3, rec.len());
1301 assert_eq!("d", &rec[0]);
1302
1303 assert!(!rdr.read_record(&mut rec).await.unwrap());
1304 }
1305
1306 #[tokio::test]
1307 async fn read_record_no_headers_after() {
1308 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1309 let mut rdr =
1310 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1311 let mut rec = StringRecord::new();
1312
1313 assert!(rdr.read_record(&mut rec).await.unwrap());
1314 assert_eq!(3, rec.len());
1315 assert_eq!("foo", &rec[0]);
1316
1317 assert!(rdr.read_record(&mut rec).await.unwrap());
1318 assert_eq!(3, rec.len());
1319 assert_eq!("a", &rec[0]);
1320
1321 assert!(rdr.read_record(&mut rec).await.unwrap());
1322 assert_eq!(3, rec.len());
1323 assert_eq!("d", &rec[0]);
1324
1325 assert!(!rdr.read_record(&mut rec).await.unwrap());
1326
1327 let headers = rdr.headers().await.unwrap();
1328 assert_eq!(3, headers.len());
1329 assert_eq!("foo", &headers[0]);
1330 assert_eq!("bar", &headers[1]);
1331 assert_eq!("baz", &headers[2]);
1332 }
1333
1334 #[derive(Debug, Deserialize, Eq, PartialEq)]
1335 struct Row1([String; 3]);
1336
1337 // Test that position info is reported correctly in absence of headers.
1338 #[tokio::test]
1339 async fn positions_no_headers() {
1340 let mut rdr = AsyncReaderBuilder::new()
1341 .has_headers(false)
1342 .create_deserializer("a,b,c\nx,y,z".as_bytes())
1343 .into_deserialize_with_pos::<Row1>();
1344
1345 let (_, pos) = rdr.next().await.unwrap();
1346 assert_eq!(pos.byte(), 0);
1347 assert_eq!(pos.line(), 1);
1348 assert_eq!(pos.record(), 0);
1349
1350 let (_, pos) = rdr.next().await.unwrap();
1351 assert_eq!(pos.byte(), 6);
1352 assert_eq!(pos.line(), 2);
1353 assert_eq!(pos.record(), 1);
1354
1355 // Test that we are at end of stream, and properly signal this.
1356 assert!(rdr.next().await.is_none());
1357 // Testing that we are not panic, trying to pass over end of stream (Issue#22)
1358 assert!(rdr.next().await.is_none());
1359 }
1360
1361 // Test that position info is reported correctly with headers.
1362 #[tokio::test]
1363 async fn positions_headers() {
1364 let mut rdr = AsyncReaderBuilder::new()
1365 .has_headers(true)
1366 .create_deserializer("a,b,c\nx,y,z".as_bytes())
1367 .into_deserialize_with_pos::<Row1>();
1368
1369 let (_, pos) = rdr.next().await.unwrap();
1370 assert_eq!(pos.byte(), 6);
1371 assert_eq!(pos.line(), 2);
1372 // We could not count header as record, but we keep compatibility with 'csv' crate.
1373 assert_eq!(pos.record(), 1);
1374 }
1375
1376 // Test that reading headers on empty data yields an empty record.
1377 #[tokio::test]
1378 async fn headers_on_empty_data() {
1379 let mut rdr = AsyncReaderBuilder::new().create_deserializer("".as_bytes());
1380 let r = rdr.byte_headers().await.unwrap();
1381 assert_eq!(r.len(), 0);
1382 }
1383
1384 // Test that reading the first record on empty data works.
1385 #[tokio::test]
1386 async fn no_headers_on_empty_data() {
1387 let mut rdr =
1388 AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
1389 assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
1390 }
1391
1392 // Test that reading the first record on empty data works, even if
1393 // we've tried to read headers before hand.
1394 #[tokio::test]
1395 async fn no_headers_on_empty_data_after_headers() {
1396 let mut rdr =
1397 AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
1398 assert_eq!(rdr.headers().await.unwrap().len(), 0);
1399 assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
1400 }
1401
1402 #[test]
1403 fn no_infinite_loop_on_io_errors() {
1404 struct FailingRead;
1405 impl io::AsyncRead for FailingRead {
1406 fn poll_read(
1407 self: Pin<&mut Self>,
1408 _cx: &mut Context,
1409 _buf: &mut tokio::io::ReadBuf
1410 ) -> Poll<Result<(), io::Error>> {
1411 Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
1412 }
1413 }
1414 impl Unpin for FailingRead {}
1415
1416 #[derive(Deserialize)]
1417 struct Fake;
1418
1419 Runtime::new().unwrap().block_on(async {
1420 let mut record_results = AsyncDeserializer::from_reader(FailingRead).into_deserialize::<Fake>();
1421 let first_result = record_results.next().await;
1422 assert!(
1423 matches!(&first_result, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1424 );
1425 assert!(record_results.next().await.is_none());
1426 });
1427 }
1428}