csv_async/async_readers/ardr_tokio.rs
1use tokio::io;
2
3use crate::AsyncReaderBuilder;
4use crate::byte_record::{ByteRecord, Position};
5use crate::error::Result;
6use crate::string_record::StringRecord;
7use super::{
8 AsyncReaderImpl,
9 StringRecordsStream, StringRecordsIntoStream,
10 ByteRecordsStream, ByteRecordsIntoStream,
11};
12
13impl AsyncReaderBuilder {
14 /// Build a CSV reader from this configuration that reads data from `rdr`.
15 ///
16 /// Note that the CSV reader is buffered automatically, so you should not
17 /// wrap `rdr` in a buffered reader.
18 ///
19 /// # Example
20 ///
21 /// ```
22 /// use std::error::Error;
23 /// use csv_async::AsyncReaderBuilder;
24 /// use tokio_stream::StreamExt;
25 ///
26 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
27 /// async fn example() -> Result<(), Box<dyn Error>> {
28 /// let data = "\
29 /// city,country,pop
30 /// Boston,United States,4628910
31 /// Concord,United States,42695
32 /// ";
33 /// let mut rdr = AsyncReaderBuilder::new().create_reader(data.as_bytes());
34 /// let mut records = rdr.into_records();
35 /// while let Some(record) = records.next().await {
36 /// println!("{:?}", record?);
37 /// }
38 /// Ok(())
39 /// }
40 /// ```
41 pub fn create_reader<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncReader<R> {
42 AsyncReader::new(self, rdr)
43 }
44}
45
46/// A already configured CSV reader for `tokio` runtime.
47///
48/// A CSV reader takes as input CSV data and transforms that into standard Rust
49/// values. The reader reads CSV data is as a sequence of records,
50/// where a record is a sequence of fields and each field is a string.
51///
52/// # Configuration
53///
54/// A CSV reader has convenient constructor method `create_reader`.
55/// However, if you want to configure the CSV reader to use
56/// a different delimiter or quote character (among many other things), then
57/// you should use a [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html) to construct
58/// a `AsyncReader`. For example, to change the field delimiter:
59///
60/// ```
61/// use std::error::Error;
62/// use csv_async::AsyncReaderBuilder;
63/// use tokio_stream::StreamExt;
64///
65/// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
66/// async fn example() -> Result<(), Box<dyn Error>> {
67/// let data = "\
68/// city;country;pop
69/// Boston;United States;4628910
70/// ";
71/// let mut rdr = AsyncReaderBuilder::new()
72/// .delimiter(b';')
73/// .create_reader(data.as_bytes());
74///
75/// let mut records = rdr.records();
76/// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
77/// Ok(())
78/// }
79/// ```
80///
81/// # Error handling
82///
83/// In general, CSV *parsing* does not ever return an error. That is, there is
84/// no such thing as malformed CSV data. Instead, this reader will prioritize
85/// finding a parse over rejecting CSV data that it does not understand. This
86/// choice was inspired by other popular CSV parsers, but also because it is
87/// pragmatic. CSV data varies wildly, so even if the CSV data is malformed,
88/// it might still be possible to work with the data. In the land of CSV, there
89/// is no "right" or "wrong," only "right" and "less right."
90///
91/// With that said, a number of errors can occur while reading CSV data:
92///
93/// * By default, all records in CSV data must have the same number of fields.
94/// If a record is found with a different number of fields than a prior
95/// record, then an error is returned. This behavior can be disabled by
96/// enabling flexible parsing via the `flexible` method on
97/// [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html).
98/// * When reading CSV data from a resource (like a file), it is possible for
99/// reading from the underlying resource to fail. This will return an error.
100/// For subsequent calls to the reader after encountering a such error
101/// (unless `seek` is used), it will behave as if end of file had been
102/// reached, in order to avoid running into infinite loops when still
103/// attempting to read the next record when one has errored.
104/// * When reading CSV data into `String` or `&str` fields (e.g., via a
105/// [`StringRecord`](struct.StringRecord.html)), UTF-8 is strictly
106/// enforced. If CSV data is invalid UTF-8, then an error is returned. If
107/// you want to read invalid UTF-8, then you should use the byte oriented
108/// APIs such as [`ByteRecord`](struct.ByteRecord.html). If you need explicit
109/// support for another encoding entirely, then you'll need to use another
110/// crate to transcode your CSV data to UTF-8 before parsing it.
111/// * When using Serde to deserialize CSV data into Rust types, it is possible
112/// for a number of additional errors to occur. For example, deserializing
113/// a field `xyz` into an `i32` field will result in an error.
114///
115/// For more details on the precise semantics of errors, see the
116/// [`Error`](enum.Error.html) type.
117#[derive(Debug)]
118pub struct AsyncReader<R>(AsyncReaderImpl<R>);
119
120impl<'r, R> AsyncReader<R>
121where
122 R: io::AsyncRead + Unpin + Send + 'r,
123{
124 /// Create a new CSV reader given a builder and a source of underlying
125 /// bytes.
126 fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncReader<R> {
127 AsyncReader(AsyncReaderImpl::new(builder, rdr))
128 }
129
130 /// Create a new CSV parser with a default configuration for the given
131 /// reader.
132 ///
133 /// To customize CSV parsing, use a `ReaderBuilder`.
134 ///
135 /// # Example
136 ///
137 /// ```
138 /// use std::error::Error;
139 /// use csv_async::AsyncReader;
140 /// use tokio_stream::StreamExt;
141 ///
142 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
143 /// async fn example() -> Result<(), Box<dyn Error>> {
144 /// let data = "\
145 /// city,country,pop
146 /// Boston,United States,4628910
147 /// Concord,United States,42695
148 /// ";
149 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
150 /// let mut records = rdr.into_records();
151 /// while let Some(record) = records.next().await {
152 /// println!("{:?}", record?);
153 /// }
154 /// Ok(())
155 /// }
156 /// ```
157 #[inline]
158 pub fn from_reader(rdr: R) -> AsyncReader<R> {
159 AsyncReaderBuilder::new().create_reader(rdr)
160 }
161
162 /// Returns a borrowed iterator over all records as strings.
163 ///
164 /// Each item yielded by this iterator is a `Result<StringRecord, Error>`.
165 /// Therefore, in order to access the record, callers must handle the
166 /// possibility of error (typically with `try!` or `?`).
167 ///
168 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
169 /// default), then this does not include the first record.
170 ///
171 /// # Example
172 ///
173 /// ```
174 /// use std::error::Error;
175 /// use csv_async::AsyncReader;
176 /// use tokio_stream::StreamExt;
177 ///
178 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
179 /// async fn example() -> Result<(), Box<dyn Error>> {
180 /// let data = "\
181 /// city,country,pop
182 /// Boston,United States,4628910
183 /// ";
184 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
185 /// let mut records = rdr.records();
186 /// while let Some(record) = records.next().await {
187 /// println!("{:?}", record?);
188 /// }
189 /// Ok(())
190 /// }
191 /// ```
192 #[inline]
193 pub fn records(&mut self) -> StringRecordsStream<R> {
194 StringRecordsStream::new(&mut self.0)
195 }
196
197 /// Returns an owned iterator over all records as strings.
198 ///
199 /// Each item yielded by this iterator is a `Result<StringRecord, Error>`.
200 /// Therefore, in order to access the record, callers must handle the
201 /// possibility of error (typically with `try!` or `?`).
202 ///
203 /// This is mostly useful when you want to return a CSV iterator or store
204 /// it somewhere.
205 ///
206 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
207 /// default), then this does not include the first record.
208 ///
209 /// # Example
210 ///
211 /// ```
212 /// use std::error::Error;
213 /// use csv_async::AsyncReader;
214 /// use tokio_stream::StreamExt;
215 ///
216 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
217 /// async fn example() -> Result<(), Box<dyn Error>> {
218 /// let data = "\
219 /// city,country,pop
220 /// Boston,United States,4628910
221 /// ";
222 /// let rdr = AsyncReader::from_reader(data.as_bytes());
223 /// let mut records = rdr.into_records();
224 /// while let Some(record) = records.next().await {
225 /// println!("{:?}", record?);
226 /// }
227 /// Ok(())
228 /// }
229 /// ```
230 #[inline]
231 pub fn into_records(self) -> StringRecordsIntoStream<'r, R> {
232 StringRecordsIntoStream::new(self.0)
233 }
234
235 /// Returns a borrowed iterator over all records as raw bytes.
236 ///
237 /// Each item yielded by this iterator is a `Result<ByteRecord, Error>`.
238 /// Therefore, in order to access the record, callers must handle the
239 /// possibility of error (typically with `try!` or `?`).
240 ///
241 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
242 /// default), then this does not include the first record.
243 ///
244 /// # Example
245 ///
246 /// ```
247 /// use std::error::Error;
248 /// use csv_async::AsyncReader;
249 /// use tokio_stream::StreamExt;
250 ///
251 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
252 /// async fn example() -> Result<(), Box<dyn Error>> {
253 /// let data = "\
254 /// city,country,pop
255 /// Boston,United States,4628910
256 /// ";
257 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
258 /// let mut iter = rdr.byte_records();
259 /// assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
260 /// assert!(iter.next().await.is_none());
261 /// Ok(())
262 /// }
263 /// ```
264 #[inline]
265 pub fn byte_records(&mut self) -> ByteRecordsStream<R> {
266 ByteRecordsStream::new(&mut self.0)
267 }
268
269 /// Returns an owned iterator over all records as raw bytes.
270 ///
271 /// Each item yielded by this iterator is a `Result<ByteRecord, Error>`.
272 /// Therefore, in order to access the record, callers must handle the
273 /// possibility of error (typically with `try!` or `?`).
274 ///
275 /// This is mostly useful when you want to return a CSV iterator or store
276 /// it somewhere.
277 ///
278 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
279 /// default), then this does not include the first record.
280 ///
281 /// # Example
282 ///
283 /// ```
284 /// use std::error::Error;
285 /// use csv_async::AsyncReader;
286 /// use tokio_stream::StreamExt;
287 ///
288 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
289 /// async fn example() -> Result<(), Box<dyn Error>> {
290 /// let data = "\
291 /// city,country,pop
292 /// Boston,United States,4628910
293 /// ";
294 /// let rdr = AsyncReader::from_reader(data.as_bytes());
295 /// let mut iter = rdr.into_byte_records();
296 /// assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
297 /// assert!(iter.next().await.is_none());
298 /// Ok(())
299 /// }
300 /// ```
301 #[inline]
302 pub fn into_byte_records(self) -> ByteRecordsIntoStream<'r, R> {
303 ByteRecordsIntoStream::new(self.0)
304 }
305
306 /// Returns a reference to the first row read by this parser.
307 ///
308 /// If no row has been read yet, then this will force parsing of the first
309 /// row.
310 ///
311 /// If there was a problem parsing the row or if it wasn't valid UTF-8,
312 /// then this returns an error.
313 ///
314 /// If the underlying reader emits EOF before any data, then this returns
315 /// an empty record.
316 ///
317 /// Note that this method may be used regardless of whether `has_headers`
318 /// was enabled (but it is enabled by default).
319 ///
320 /// # Example
321 ///
322 /// This example shows how to get the header row of CSV data. Notice that
323 /// the header row does not appear as a record in the iterator!
324 ///
325 /// ```
326 /// use std::error::Error;
327 /// use csv_async::AsyncReader;
328 /// use tokio_stream::StreamExt;
329 ///
330 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
331 /// async fn example() -> Result<(), Box<dyn Error>> {
332 /// let data = "\
333 /// city,country,pop
334 /// Boston,United States,4628910
335 /// ";
336 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
337 ///
338 /// // We can read the headers before iterating.
339 /// {
340 /// // `headers` borrows from the reader, so we put this in its
341 /// // own scope. That way, the borrow ends before we try iterating
342 /// // below. Alternatively, we could clone the headers.
343 /// let headers = rdr.headers().await?;
344 /// assert_eq!(headers, vec!["city", "country", "pop"]);
345 /// }
346 ///
347 /// {
348 /// let mut records = rdr.records();
349 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
350 /// assert!(records.next().await.is_none());
351 /// }
352 ///
353 /// // We can also read the headers after iterating.
354 /// let headers = rdr.headers().await?;
355 /// assert_eq!(headers, vec!["city", "country", "pop"]);
356 /// Ok(())
357 /// }
358 /// ```
359 #[inline]
360 pub async fn headers(&mut self) -> Result<&StringRecord> {
361 self.0.headers().await
362 }
363
364 /// Returns a reference to the first row read by this parser as raw bytes.
365 ///
366 /// If no row has been read yet, then this will force parsing of the first
367 /// row.
368 ///
369 /// If there was a problem parsing the row then this returns an error.
370 ///
371 /// If the underlying reader emits EOF before any data, then this returns
372 /// an empty record.
373 ///
374 /// Note that this method may be used regardless of whether `has_headers`
375 /// was enabled (but it is enabled by default).
376 ///
377 /// # Example
378 ///
379 /// This example shows how to get the header row of CSV data. Notice that
380 /// the header row does not appear as a record in the iterator!
381 ///
382 /// ```
383 /// use std::error::Error;
384 /// use csv_async::AsyncReader;
385 /// use tokio_stream::StreamExt;
386 ///
387 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
388 /// async fn example() -> Result<(), Box<dyn Error>> {
389 /// let data = "\
390 /// city,country,pop
391 /// Boston,United States,4628910
392 /// ";
393 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
394 ///
395 /// // We can read the headers before iterating.
396 /// {
397 /// // `headers` borrows from the reader, so we put this in its
398 /// // own scope. That way, the borrow ends before we try iterating
399 /// // below. Alternatively, we could clone the headers.
400 /// let headers = rdr.byte_headers().await?;
401 /// assert_eq!(headers, vec!["city", "country", "pop"]);
402 /// }
403 ///
404 /// {
405 /// let mut records = rdr.byte_records();
406 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
407 /// assert!(records.next().await.is_none());
408 /// }
409 ///
410 /// // We can also read the headers after iterating.
411 /// let headers = rdr.byte_headers().await?;
412 /// assert_eq!(headers, vec!["city", "country", "pop"]);
413 /// Ok(())
414 /// }
415 /// ```
416 #[inline]
417 pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
418 self.0.byte_headers().await
419 }
420
421 /// Set the headers of this CSV parser manually.
422 ///
423 /// This overrides any other setting (including `set_byte_headers`). Any
424 /// automatic detection of headers is disabled. This may be called at any
425 /// time.
426 ///
427 /// # Example
428 ///
429 /// ```
430 /// use std::error::Error;
431 /// use csv_async::{AsyncReader, StringRecord};
432 ///
433 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
434 /// async fn example() -> Result<(), Box<dyn Error>> {
435 /// let data = "\
436 /// city,country,pop
437 /// Boston,United States,4628910
438 /// ";
439 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
440 ///
441 /// assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
442 /// rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
443 /// assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
444 ///
445 /// Ok(())
446 /// }
447 /// ```
448 #[inline]
449 pub fn set_headers(&mut self, headers: StringRecord) {
450 self.0.set_headers(headers);
451 }
452
453 /// Set the headers of this CSV parser manually as raw bytes.
454 ///
455 /// This overrides any other setting (including `set_headers`). Any
456 /// automatic detection of headers is disabled. This may be called at any
457 /// time.
458 ///
459 /// # Example
460 ///
461 /// ```
462 /// use std::error::Error;
463 /// use csv_async::{AsyncReader, ByteRecord};
464 ///
465 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
466 /// async fn example() -> Result<(), Box<dyn Error>> {
467 /// let data = "\
468 /// city,country,pop
469 /// Boston,United States,4628910
470 /// ";
471 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
472 ///
473 /// assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
474 /// rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
475 /// assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
476 ///
477 /// Ok(())
478 /// }
479 /// ```
480 #[inline]
481 pub fn set_byte_headers(&mut self, headers: ByteRecord) {
482 self.0.set_byte_headers(headers);
483 }
484
485 /// Read a single row into the given record. Returns false when no more
486 /// records could be read.
487 ///
488 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
489 /// default), then this will treat initial row as headers and read the first data record.
490 ///
491 /// This method is useful when you want to read records as fast as
492 /// as possible. It's less ergonomic than an iterator, but it permits the
493 /// caller to reuse the `StringRecord` allocation, which usually results
494 /// in higher throughput.
495 ///
496 /// Records read via this method are guaranteed to have a position set
497 /// on them, even if the reader is at EOF or if an error is returned.
498 ///
499 /// # Example
500 ///
501 /// ```
502 /// use std::error::Error;
503 /// use csv_async::{AsyncReader, StringRecord};
504 ///
505 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
506 /// async fn example() -> Result<(), Box<dyn Error>> {
507 /// let data = "\
508 /// city,country,pop
509 /// Boston,United States,4628910
510 /// ";
511 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
512 /// let mut record = StringRecord::new();
513 ///
514 /// if rdr.read_record(&mut record).await? {
515 /// assert_eq!(record, vec!["Boston", "United States", "4628910"]);
516 /// Ok(())
517 /// } else {
518 /// Err(From::from("expected at least one record but got none"))
519 /// }
520 /// }
521 /// ```
522 #[inline]
523 pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
524 self.0.read_record(record).await
525 }
526
527 /// Read a single row into the given byte record. Returns false when no
528 /// more records could be read.
529 ///
530 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
531 /// default), then this will treat initial row as headers and read the first data record.
532 ///
533 /// This method is useful when you want to read records as fast as
534 /// as possible. It's less ergonomic than an iterator, but it permits the
535 /// caller to reuse the `ByteRecord` allocation, which usually results
536 /// in higher throughput.
537 ///
538 /// Records read via this method are guaranteed to have a position set
539 /// on them, even if the reader is at EOF or if an error is returned.
540 ///
541 /// # Example
542 ///
543 /// ```
544 /// use std::error::Error;
545 /// use csv_async::{ByteRecord, AsyncReader};
546 ///
547 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
548 /// async fn example() -> Result<(), Box<dyn Error>> {
549 /// let data = "\
550 /// city,country,pop
551 /// Boston,United States,4628910
552 /// ";
553 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
554 /// let mut record = ByteRecord::new();
555 ///
556 /// if rdr.read_byte_record(&mut record).await? {
557 /// assert_eq!(record, vec!["Boston", "United States", "4628910"]);
558 /// Ok(())
559 /// } else {
560 /// Err(From::from("expected at least one record but got none"))
561 /// }
562 /// }
563 /// ```
564 #[inline]
565 pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
566 self.0.read_byte_record(record).await
567 }
568
569 /// Return the current position of this CSV reader.
570 ///
571 /// The byte offset in the position returned can be used to `seek` this
572 /// reader. In particular, seeking to a position returned here on the same
573 /// data will result in parsing the same subsequent record.
574 ///
575 /// # Example: reading the position
576 ///
577 /// ```
578 /// use std::error::Error;
579 /// use std::io;
580 /// use csv_async::{AsyncReader, Position};
581 /// use tokio_stream::StreamExt;
582 ///
583 /// # fn main() { tokio1::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
584 /// async fn example() -> Result<(), Box<dyn Error>> {
585 /// let data = "\
586 /// city,country,popcount
587 /// Boston,United States,4628910
588 /// Concord,United States,42695
589 /// ";
590 /// let rdr = AsyncReader::from_reader(io::Cursor::new(data));
591 /// let mut iter = rdr.into_records();
592 /// let mut pos = Position::new();
593 /// loop {
594 /// let next = iter.next().await;
595 /// if let Some(next) = next {
596 /// pos = next?.position().expect("Cursor should be at some valid position").clone();
597 /// } else {
598 /// break;
599 /// }
600 /// }
601 ///
602 /// // `pos` should now be the position immediately before the last
603 /// // record.
604 /// assert_eq!(pos.byte(), 51);
605 /// assert_eq!(pos.line(), 3);
606 /// assert_eq!(pos.record(), 2);
607 /// Ok(())
608 /// }
609 /// ```
610 #[inline]
611 pub fn position(&self) -> &Position {
612 self.0.position()
613 }
614
615 /// Returns true if and only if this reader has been exhausted.
616 ///
617 /// When this returns true, no more records can be read from this reader.
618 ///
619 /// # Example
620 ///
621 /// ```
622 /// # use tokio1 as tokio;
623 /// use std::error::Error;
624 /// use tokio::io;
625 /// use tokio_stream::StreamExt;
626 /// use csv_async::{AsyncReader, Position};
627 ///
628 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
629 /// async fn example() -> Result<(), Box<dyn Error>> {
630 /// let data = "\
631 /// city,country,popcount
632 /// Boston,United States,4628910
633 /// Concord,United States,42695
634 /// ";
635 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
636 /// assert!(!rdr.is_done());
637 /// {
638 /// let mut records = rdr.records();
639 /// while let Some(record) = records.next().await {
640 /// let _ = record?;
641 /// }
642 /// }
643 /// assert!(rdr.is_done());
644 /// Ok(())
645 /// }
646 /// ```
647 #[inline]
648 pub fn is_done(&self) -> bool {
649 self.0.is_done()
650 }
651
652 /// Returns true if and only if this reader has been configured to
653 /// interpret the first record as a header record.
654 #[inline]
655 pub fn has_headers(&self) -> bool {
656 self.0.has_headers()
657 }
658
659 /// Returns a reference to the underlying reader.
660 #[inline]
661 pub fn get_ref(&self) -> &R {
662 self.0.get_ref()
663 }
664
665 /// Returns a mutable reference to the underlying reader.
666 #[inline]
667 pub fn get_mut(&mut self) -> &mut R {
668 self.0.get_mut()
669 }
670
671 /// Unwraps this CSV reader, returning the underlying reader.
672 ///
673 /// Note that any leftover data inside this reader's internal buffer is
674 /// lost.
675 #[inline]
676 pub fn into_inner(self) -> R {
677 self.0.into_inner()
678 }
679}
680
681impl<R: io::AsyncRead + io::AsyncSeek + std::marker::Unpin> AsyncReader<R> {
682 /// Seeks the underlying reader to the position given.
683 ///
684 /// This comes with a few caveats:
685 ///
686 /// * Any internal buffer associated with this reader is cleared.
687 /// * If the given position does not correspond to a position immediately
688 /// before the start of a record, then the behavior of this reader is
689 /// unspecified.
690 /// * Any special logic that skips the first record in the CSV reader
691 /// when reading or iterating over records is disabled.
692 ///
693 /// If the given position has a byte offset equivalent to the current
694 /// position, then no seeking is performed.
695 ///
696 /// If the header row has not already been read, then this will attempt
697 /// to read the header row before seeking. Therefore, it is possible that
698 /// this returns an error associated with reading CSV data.
699 ///
700 /// Note that seeking is performed based only on the byte offset in the
701 /// given position. Namely, the record or line numbers in the position may
702 /// be incorrect, but this will cause any future position generated by
703 /// this CSV reader to be similarly incorrect.
704 ///
705 /// # Example: seek to parse a record twice
706 ///
707 /// ```
708 /// # use tokio1 as tokio;
709 /// use std::error::Error;
710 /// use tokio::io;
711 /// use tokio_stream::StreamExt;
712 /// use csv_async::{AsyncReader, Position};
713 ///
714 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
715 /// async fn example() -> Result<(), Box<dyn Error>> {
716 /// let data = "\
717 /// city,country,popcount
718 /// Boston,United States,4628910
719 /// Concord,United States,42695
720 /// ";
721 /// let mut rdr = AsyncReader::from_reader(std::io::Cursor::new(data));
722 /// let mut pos = Position::new();
723 /// {
724 /// let mut records = rdr.records();
725 /// loop {
726 /// let next = records.next().await;
727 /// if let Some(next) = next {
728 /// pos = next?.position().expect("Cursor should be at some valid position").clone();
729 /// } else {
730 /// break;
731 /// }
732 /// }
733 /// }
734 ///
735 /// {
736 /// // Now seek the reader back to `pos`. This will let us read the
737 /// // last record again.
738 /// rdr.seek(pos).await?;
739 /// let mut records = rdr.into_records();
740 /// if let Some(result) = records.next().await {
741 /// let record = result?;
742 /// assert_eq!(record, vec!["Concord", "United States", "42695"]);
743 /// Ok(())
744 /// } else {
745 /// Err(From::from("expected at least one record but got none"))
746 /// }
747 /// }
748 /// }
749 /// ```
750 #[inline]
751 pub async fn seek(&mut self, pos: Position) -> Result<()> {
752 self.0.seek(pos).await
753 }
754
755 /// This is like `seek`, but provides direct control over how the seeking
756 /// operation is performed via `io::SeekFrom`.
757 ///
758 /// The `pos` position given *should* correspond the position indicated
759 /// by `seek_from`, but there is no requirement. If the `pos` position
760 /// given is incorrect, then the position information returned by this
761 /// reader will be similarly incorrect.
762 ///
763 /// If the header row has not already been read, then this will attempt
764 /// to read the header row before seeking. Therefore, it is possible that
765 /// this returns an error associated with reading CSV data.
766 ///
767 /// Unlike `seek`, this will always cause an actual seek to be performed.
768 #[inline]
769 pub async fn seek_raw(
770 &mut self,
771 seek_from: io::SeekFrom,
772 pos: Position,
773 ) -> Result<()> {
774 self.0.seek_raw(seek_from, pos).await
775 }
776
777 /// Rewinds the underlying reader to first data record.
778 ///
779 /// Function is aware of header presence.
780 /// After `rewind` record iterators will return first data record (skipping header if present), while
781 /// after `seek(0)` they will return header row (even if `has_header` is set).
782 ///
783 /// # Example: Reads the same data multiply times
784 ///
785 /// ```
786 /// # use tokio1 as tokio;
787 /// use std::error::Error;
788 /// use tokio::io;
789 /// use tokio_stream::StreamExt;
790 /// use csv_async::AsyncReader;
791 ///
792 /// # fn main() { tokio::runtime::Runtime::new().unwrap().block_on(async {example().await.unwrap()}); }
793 /// async fn example() -> Result<(), Box<dyn Error>> {
794 /// let data = "\
795 /// city,country,popcount
796 /// Boston,United States,4628910
797 /// Concord,United States,42695
798 /// ";
799 /// let mut rdr = AsyncReader::from_reader(std::io::Cursor::new(data));
800 /// let mut output = Vec::new();
801 /// loop {
802 /// let mut records = rdr.records();
803 /// while let Some(rec) = records.next().await {
804 /// output.push(rec?);
805 /// }
806 /// if output.len() >= 6 {
807 /// break;
808 /// } else {
809 /// drop(records);
810 /// rdr.rewind().await?;
811 /// }
812 /// }
813 /// assert_eq!(output,
814 /// vec![
815 /// vec!["Boston", "United States", "4628910"],
816 /// vec!["Concord", "United States", "42695"],
817 /// vec!["Boston", "United States", "4628910"],
818 /// vec!["Concord", "United States", "42695"],
819 /// vec!["Boston", "United States", "4628910"],
820 /// vec!["Concord", "United States", "42695"],
821 /// ]);
822 /// Ok(())
823 /// }
824 /// ```
825 #[inline]
826 pub async fn rewind(&mut self) -> Result<()> {
827 self.0.rewind().await
828 }
829}
830
831#[cfg(test)]
832mod tests {
833 use std::pin::Pin;
834 use std::task::{Context, Poll};
835
836 use tokio::io;
837 use tokio_stream::StreamExt;
838 use tokio::runtime::Runtime;
839
840 use crate::byte_record::ByteRecord;
841 use crate::error::ErrorKind;
842 use crate::string_record::StringRecord;
843 use crate::Trim;
844
845 use super::{Position, AsyncReaderBuilder, AsyncReader};
846
847 fn b(s: &str) -> &[u8] {
848 s.as_bytes()
849 }
850 fn s(b: &[u8]) -> &str {
851 ::std::str::from_utf8(b).unwrap()
852 }
853
854 fn newpos(byte: u64, line: u64, record: u64) -> Position {
855 let mut p = Position::new();
856 p.set_byte(byte).set_line(line).set_record(record);
857 p
858 }
859
860 async fn count(stream: impl StreamExt) -> usize {
861 stream.fold(0, |acc, _| acc + 1 ).await
862 }
863
864 #[tokio::test]
865 async fn read_byte_record() {
866 let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
867 let mut rdr =
868 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
869 let mut rec = ByteRecord::new();
870
871 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
872 assert_eq!(3, rec.len());
873 assert_eq!("foo", s(&rec[0]));
874 assert_eq!("b,ar", s(&rec[1]));
875 assert_eq!("baz", s(&rec[2]));
876
877 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
878 assert_eq!(3, rec.len());
879 assert_eq!("abc", s(&rec[0]));
880 assert_eq!("mno", s(&rec[1]));
881 assert_eq!("xyz", s(&rec[2]));
882
883 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
884 }
885
886 #[tokio::test]
887 async fn read_trimmed_records_and_headers() {
888 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
889 let mut rdr = AsyncReaderBuilder::new()
890 .has_headers(true)
891 .trim(Trim::All)
892 .create_reader(data);
893 let mut rec = ByteRecord::new();
894 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
895 assert_eq!("1", s(&rec[0]));
896 assert_eq!("2", s(&rec[1]));
897 assert_eq!("3", s(&rec[2]));
898 let mut rec = StringRecord::new();
899 assert!(rdr.read_record(&mut rec).await.unwrap());
900 assert_eq!("1", &rec[0]);
901 assert_eq!("", &rec[1]);
902 assert_eq!("3", &rec[2]);
903 {
904 let headers = rdr.headers().await.unwrap();
905 assert_eq!(3, headers.len());
906 assert_eq!("foo", &headers[0]);
907 assert_eq!("bar", &headers[1]);
908 assert_eq!("baz", &headers[2]);
909 }
910 }
911
912 #[tokio::test]
913 async fn read_trimmed_header() {
914 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
915 let mut rdr = AsyncReaderBuilder::new()
916 .has_headers(true)
917 .trim(Trim::Headers)
918 .create_reader(data);
919 let mut rec = ByteRecord::new();
920 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
921 assert_eq!(" 1", s(&rec[0]));
922 assert_eq!(" 2", s(&rec[1]));
923 assert_eq!(" 3", s(&rec[2]));
924 {
925 let headers = rdr.headers().await.unwrap();
926 assert_eq!(3, headers.len());
927 assert_eq!("foo", &headers[0]);
928 assert_eq!("bar", &headers[1]);
929 assert_eq!("baz", &headers[2]);
930 }
931 }
932
933 #[tokio::test]
934 async fn read_trimed_header_invalid_utf8() {
935 let data = &b"foo, b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
936 let mut rdr = AsyncReaderBuilder::new()
937 .has_headers(true)
938 .trim(Trim::Headers)
939 .create_reader(data);
940 let mut rec = StringRecord::new();
941
942 // force the headers to be read
943 let _ = rdr.read_record(&mut rec).await;
944 // Check the byte headers are trimmed
945 {
946 let headers = rdr.byte_headers().await.unwrap();
947 assert_eq!(3, headers.len());
948 assert_eq!(b"foo", &headers[0]);
949 assert_eq!(b"b\xFFar", &headers[1]);
950 assert_eq!(b"baz", &headers[2]);
951 }
952 match *rdr.headers().await.unwrap_err().kind() {
953 ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
954 assert_eq!(pos, &newpos(0, 1, 0));
955 assert_eq!(err.field(), 1);
956 assert_eq!(err.valid_up_to(), 3);
957 }
958 ref err => panic!("match failed, got {:?}", err),
959 }
960 }
961
962 #[tokio::test]
963 async fn read_trimmed_records() {
964 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
965 let mut rdr = AsyncReaderBuilder::new()
966 .has_headers(true)
967 .trim(Trim::Fields)
968 .create_reader(data);
969 let mut rec = ByteRecord::new();
970 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
971 assert_eq!("1", s(&rec[0]));
972 assert_eq!("2", s(&rec[1]));
973 assert_eq!("3", s(&rec[2]));
974 {
975 let headers = rdr.headers().await.unwrap();
976 assert_eq!(3, headers.len());
977 assert_eq!("foo", &headers[0]);
978 assert_eq!(" bar", &headers[1]);
979 assert_eq!("\tbaz", &headers[2]);
980 }
981 }
982
983 #[tokio::test]
984 async fn read_record_unequal_fails() {
985 let data = b("foo\nbar,baz");
986 let mut rdr =
987 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
988 let mut rec = ByteRecord::new();
989
990 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
991 assert_eq!(1, rec.len());
992 assert_eq!("foo", s(&rec[0]));
993
994 match rdr.read_byte_record(&mut rec).await {
995 Err(err) => match *err.kind() {
996 ErrorKind::UnequalLengths {
997 expected_len: 1,
998 ref pos,
999 len: 2,
1000 } => {
1001 assert_eq!(pos, &Some(newpos(4, 2, 1)));
1002 }
1003 ref wrong => panic!("match failed, got {:?}", wrong),
1004 },
1005 wrong => panic!("match failed, got {:?}", wrong),
1006 }
1007 }
1008
1009 #[tokio::test]
1010 async fn read_record_unequal_ok() {
1011 let data = b("foo\nbar,baz");
1012 let mut rdr = AsyncReaderBuilder::new()
1013 .has_headers(false)
1014 .flexible(true)
1015 .create_reader(data);
1016 let mut rec = ByteRecord::new();
1017
1018 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1019 assert_eq!(1, rec.len());
1020 assert_eq!("foo", s(&rec[0]));
1021
1022 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1023 assert_eq!(2, rec.len());
1024 assert_eq!("bar", s(&rec[0]));
1025 assert_eq!("baz", s(&rec[1]));
1026
1027 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1028 }
1029
1030 // This tests that even if we get a CSV error, we can continue reading
1031 // if we want.
1032 #[tokio::test]
1033 async fn read_record_unequal_continue() {
1034 let data = b("foo\nbar,baz\nquux");
1035 let mut rdr =
1036 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1037 let mut rec = ByteRecord::new();
1038
1039 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1040 assert_eq!(1, rec.len());
1041 assert_eq!("foo", s(&rec[0]));
1042
1043 match rdr.read_byte_record(&mut rec).await {
1044 Err(err) => match err.kind() {
1045 &ErrorKind::UnequalLengths {
1046 expected_len: 1,
1047 ref pos,
1048 len: 2,
1049 } => {
1050 assert_eq!(pos, &Some(newpos(4, 2, 1)));
1051 }
1052 wrong => panic!("match failed, got {:?}", wrong),
1053 },
1054 wrong => panic!("match failed, got {:?}", wrong),
1055 }
1056
1057 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1058 assert_eq!(1, rec.len());
1059 assert_eq!("quux", s(&rec[0]));
1060
1061 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1062 }
1063
1064 #[tokio::test]
1065 async fn read_record_headers() {
1066 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1067 let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
1068 let mut rec = StringRecord::new();
1069
1070 assert!(rdr.read_record(&mut rec).await.unwrap());
1071 assert_eq!(3, rec.len());
1072 assert_eq!("a", &rec[0]);
1073
1074 assert!(rdr.read_record(&mut rec).await.unwrap());
1075 assert_eq!(3, rec.len());
1076 assert_eq!("d", &rec[0]);
1077
1078 assert!(!rdr.read_record(&mut rec).await.unwrap());
1079
1080 {
1081 let headers = rdr.byte_headers().await.unwrap();
1082 assert_eq!(3, headers.len());
1083 assert_eq!(b"foo", &headers[0]);
1084 assert_eq!(b"bar", &headers[1]);
1085 assert_eq!(b"baz", &headers[2]);
1086 }
1087 {
1088 let headers = rdr.headers().await.unwrap();
1089 assert_eq!(3, headers.len());
1090 assert_eq!("foo", &headers[0]);
1091 assert_eq!("bar", &headers[1]);
1092 assert_eq!("baz", &headers[2]);
1093 }
1094 }
1095
1096 #[tokio::test]
1097 async fn read_record_headers_invalid_utf8() {
1098 let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
1099 let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
1100 let mut rec = StringRecord::new();
1101
1102 assert!(rdr.read_record(&mut rec).await.unwrap());
1103 assert_eq!(3, rec.len());
1104 assert_eq!("a", &rec[0]);
1105
1106 assert!(rdr.read_record(&mut rec).await.unwrap());
1107 assert_eq!(3, rec.len());
1108 assert_eq!("d", &rec[0]);
1109
1110 assert!(!rdr.read_record(&mut rec).await.unwrap());
1111
1112 // Check that we can read the headers as raw bytes, but that
1113 // if we read them as strings, we get an appropriate UTF-8 error.
1114 {
1115 let headers = rdr.byte_headers().await.unwrap();
1116 assert_eq!(3, headers.len());
1117 assert_eq!(b"foo", &headers[0]);
1118 assert_eq!(b"b\xFFar", &headers[1]);
1119 assert_eq!(b"baz", &headers[2]);
1120 }
1121 match *rdr.headers().await.unwrap_err().kind() {
1122 ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1123 assert_eq!(pos, &newpos(0, 1, 0));
1124 assert_eq!(err.field(), 1);
1125 assert_eq!(err.valid_up_to(), 1);
1126 }
1127 ref err => panic!("match failed, got {:?}", err),
1128 }
1129 }
1130
1131 #[tokio::test]
1132 async fn read_record_no_headers_before() {
1133 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1134 let mut rdr =
1135 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1136 let mut rec = StringRecord::new();
1137
1138 {
1139 let headers = rdr.headers().await.unwrap();
1140 assert_eq!(3, headers.len());
1141 assert_eq!("foo", &headers[0]);
1142 assert_eq!("bar", &headers[1]);
1143 assert_eq!("baz", &headers[2]);
1144 }
1145
1146 assert!(rdr.read_record(&mut rec).await.unwrap());
1147 assert_eq!(3, rec.len());
1148 assert_eq!("foo", &rec[0]);
1149
1150 assert!(rdr.read_record(&mut rec).await.unwrap());
1151 assert_eq!(3, rec.len());
1152 assert_eq!("a", &rec[0]);
1153
1154 assert!(rdr.read_record(&mut rec).await.unwrap());
1155 assert_eq!(3, rec.len());
1156 assert_eq!("d", &rec[0]);
1157
1158 assert!(!rdr.read_record(&mut rec).await.unwrap());
1159 }
1160
1161 #[tokio::test]
1162 async fn read_record_no_headers_after() {
1163 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1164 let mut rdr =
1165 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1166 let mut rec = StringRecord::new();
1167
1168 assert!(rdr.read_record(&mut rec).await.unwrap());
1169 assert_eq!(3, rec.len());
1170 assert_eq!("foo", &rec[0]);
1171
1172 assert!(rdr.read_record(&mut rec).await.unwrap());
1173 assert_eq!(3, rec.len());
1174 assert_eq!("a", &rec[0]);
1175
1176 assert!(rdr.read_record(&mut rec).await.unwrap());
1177 assert_eq!(3, rec.len());
1178 assert_eq!("d", &rec[0]);
1179
1180 assert!(!rdr.read_record(&mut rec).await.unwrap());
1181
1182 let headers = rdr.headers().await.unwrap();
1183 assert_eq!(3, headers.len());
1184 assert_eq!("foo", &headers[0]);
1185 assert_eq!("bar", &headers[1]);
1186 assert_eq!("baz", &headers[2]);
1187 }
1188
1189 #[tokio::test]
1190 async fn seek() {
1191 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1192 let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1193 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1194
1195 let mut rec = StringRecord::new();
1196
1197 assert_eq!(18, rdr.position().byte());
1198 assert!(rdr.read_record(&mut rec).await.unwrap());
1199 assert_eq!(3, rec.len());
1200 assert_eq!("d", &rec[0]);
1201
1202 assert_eq!(24, rdr.position().byte());
1203 assert_eq!(4, rdr.position().line());
1204 assert_eq!(3, rdr.position().record());
1205 assert!(rdr.read_record(&mut rec).await.unwrap());
1206 assert_eq!(3, rec.len());
1207 assert_eq!("g", &rec[0]);
1208
1209 assert!(!rdr.read_record(&mut rec).await.unwrap());
1210 }
1211
1212 // Test that we can read headers after seeking even if the headers weren't
1213 // explicit read before seeking.
1214 #[tokio::test]
1215 async fn seek_headers_after() {
1216 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1217 let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1218 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1219 assert_eq!(rdr.headers().await.unwrap(), vec!["foo", "bar", "baz"]);
1220}
1221
1222 // Test that we can read headers after seeking if the headers were read
1223 // before seeking.
1224 #[tokio::test]
1225 async fn seek_headers_before_after() {
1226 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1227 let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1228 let headers = rdr.headers().await.unwrap().clone();
1229 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1230 assert_eq!(&headers, rdr.headers().await.unwrap());
1231 }
1232
1233 // Test that even if we didn't read headers before seeking, if we seek to
1234 // the current byte offset, then no seeking is done and therefore we can
1235 // still read headers after seeking.
1236 #[tokio::test]
1237 async fn seek_headers_no_actual_seek() {
1238 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1239 let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1240 rdr.seek(Position::new()).await.unwrap();
1241 assert_eq!("foo", &rdr.headers().await.unwrap()[0]);
1242 }
1243
1244 #[tokio::test]
1245 async fn rewind() {
1246 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1247 let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
1248
1249 let mut rec = StringRecord::new();
1250 assert!(rdr.read_record(&mut rec).await.unwrap());
1251 assert_eq!(3, rec.len());
1252 assert_eq!("a", &rec[0]);
1253
1254 // assert_eq!(18, rdr.position().byte());
1255 assert!(rdr.read_record(&mut rec).await.unwrap());
1256 assert_eq!(3, rec.len());
1257 assert_eq!("d", &rec[0]);
1258
1259 rdr.rewind().await.unwrap();
1260
1261 assert!(rdr.read_record(&mut rec).await.unwrap());
1262 assert_eq!(3, rec.len());
1263 assert_eq!("a", &rec[0]);
1264 }
1265
1266 // Test that position info is reported correctly in absence of headers.
1267 #[tokio::test]
1268 async fn positions_no_headers() {
1269 let mut rdr = AsyncReaderBuilder::new()
1270 .has_headers(false)
1271 .create_reader("a,b,c\nx,y,z".as_bytes())
1272 .into_records();
1273
1274 let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1275 assert_eq!(pos.byte(), 0);
1276 assert_eq!(pos.line(), 1);
1277 assert_eq!(pos.record(), 0);
1278
1279 let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1280 assert_eq!(pos.byte(), 6);
1281 assert_eq!(pos.line(), 2);
1282 assert_eq!(pos.record(), 1);
1283
1284 // Test that we are at end of stream, and properly signal this.
1285 assert!(rdr.next().await.is_none());
1286 // Testing that we are not panic, trying to pass over end of stream (Issue#22)
1287 assert!(rdr.next().await.is_none());
1288 }
1289
1290 // Test that position info is reported correctly with headers.
1291 #[tokio::test]
1292 async fn positions_headers() {
1293 let mut rdr = AsyncReaderBuilder::new()
1294 .has_headers(true)
1295 .create_reader("a,b,c\nx,y,z".as_bytes())
1296 .into_records();
1297
1298 let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1299 assert_eq!(pos.byte(), 6);
1300 assert_eq!(pos.line(), 2);
1301 assert_eq!(pos.record(), 1);
1302 }
1303
1304 // Test that reading headers on empty data yields an empty record.
1305 #[tokio::test]
1306 async fn headers_on_empty_data() {
1307 let mut rdr = AsyncReaderBuilder::new().create_reader("".as_bytes());
1308 let r = rdr.byte_headers().await.unwrap();
1309 assert_eq!(r.len(), 0);
1310 }
1311
1312 // Test that reading the first record on empty data works.
1313 #[tokio::test]
1314 async fn no_headers_on_empty_data() {
1315 let mut rdr =
1316 AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
1317 assert_eq!(count(rdr.records()).await, 0);
1318 }
1319
1320 // Test that reading the first record on empty data works, even if
1321 // we've tried to read headers before hand.
1322 #[tokio::test]
1323 async fn no_headers_on_empty_data_after_headers() {
1324 let mut rdr =
1325 AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
1326 assert_eq!(rdr.headers().await.unwrap().len(), 0);
1327 assert_eq!(count(rdr.records()).await, 0);
1328 }
1329
1330 #[test]
1331 fn no_infinite_loop_on_io_errors() {
1332 struct FailingRead;
1333 impl io::AsyncRead for FailingRead {
1334 fn poll_read(
1335 self: Pin<&mut Self>,
1336 _cx: &mut Context,
1337 _buf: &mut tokio::io::ReadBuf
1338 ) -> Poll<Result<(), io::Error>> {
1339 Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
1340 }
1341 }
1342 impl Unpin for FailingRead {}
1343
1344 Runtime::new().unwrap().block_on(async {
1345 let mut record_results = AsyncReader::from_reader(FailingRead).into_records();
1346 let first_result = record_results.next().await;
1347 assert!(
1348 matches!(&first_result, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1349 );
1350 assert!(record_results.next().await.is_none());
1351 });
1352 }
1353}