csv_async/async_readers/mod.rs
1use std::future::Future;
2use std::pin::Pin;
3use std::result;
4use std::task::{Context, Poll};
5
6cfg_if::cfg_if! {
7if #[cfg(feature = "tokio")] {
8 use tokio::io::{self, AsyncBufRead, AsyncSeekExt};
9 use tokio_stream::Stream;
10} else {
11 use futures::io::{self, AsyncBufRead, AsyncSeekExt};
12 use futures::stream::Stream;
13}}
14
15use csv_core::ReaderBuilder as CoreReaderBuilder;
16use csv_core::Reader as CoreReader;
17#[cfg(feature = "with_serde")]
18use serde::de::DeserializeOwned;
19
20use crate::{Terminator, Trim};
21use crate::byte_record::{ByteRecord, Position};
22use crate::error::{Error, ErrorKind, Result, Utf8Error};
23use crate::string_record::StringRecord;
24
25cfg_if::cfg_if! {
26if #[cfg(feature = "tokio")] {
27 pub mod ardr_tokio;
28} else {
29 pub mod ardr_futures;
30}}
31
32#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
33pub mod ades_futures;
34
35#[cfg(all(feature = "with_serde", feature = "tokio"))]
36pub mod ades_tokio;
37
38//-//////////////////////////////////////////////////////////////////////////////////////////////
39//-// Builder
40//-//////////////////////////////////////////////////////////////////////////////////////////////
41
42/// Builds a CSV reader with various configuration knobs.
43///
44/// This builder can be used to tweak the field delimiter, record terminator
45/// and more. Once a CSV reader / deserializer is built, its configuration cannot be
46/// changed.
47#[derive(Debug)]
48pub struct AsyncReaderBuilder {
49 capacity: usize,
50 flexible: bool,
51 has_headers: bool,
52 trim: Trim,
53 end_on_io_error: bool,
54 /// The underlying CSV parser builder.
55 ///
56 /// We explicitly put this on the heap because CoreReaderBuilder embeds an
57 /// entire DFA transition table, which along with other things, tallies up
58 /// to almost 500 bytes on the stack.
59 builder: Box<CoreReaderBuilder>,
60}
61
62impl Default for AsyncReaderBuilder {
63 fn default() -> AsyncReaderBuilder {
64 AsyncReaderBuilder {
65 capacity: 8 * (1 << 10),
66 flexible: false,
67 has_headers: true,
68 trim: Trim::default(),
69 end_on_io_error: true,
70 builder: Box::<CoreReaderBuilder>::default(),
71 }
72 }
73}
74
75impl AsyncReaderBuilder {
76 /// Create a new builder for configuring CSV parsing.
77 ///
78 /// To convert a builder into a reader, call one of the methods starting
79 /// with `from_`.
80 ///
81 /// # Example
82 ///
83 /// ```
84 /// use std::error::Error;
85 /// use futures::stream::StreamExt;
86 /// use csv_async::{AsyncReaderBuilder, StringRecord};
87 ///
88 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
89 /// async fn example() -> Result<(), Box<dyn Error>> {
90 /// let data = "\
91 /// city,country,pop
92 /// Boston,United States,4628910
93 /// Concord,United States,42695
94 /// ";
95 /// let mut rdr = AsyncReaderBuilder::new().create_reader(data.as_bytes());
96 ///
97 /// let records = rdr
98 /// .records()
99 /// .map(Result::unwrap)
100 /// .collect::<Vec<StringRecord>>().await;
101 /// assert_eq!(records, vec![
102 /// vec!["Boston", "United States", "4628910"],
103 /// vec!["Concord", "United States", "42695"],
104 /// ]);
105 /// Ok(())
106 /// }
107 /// ```
108 pub fn new() -> AsyncReaderBuilder {
109 AsyncReaderBuilder::default()
110 }
111
112 /// The field delimiter to use when parsing CSV.
113 ///
114 /// The default is `b','`.
115 ///
116 /// # Example
117 ///
118 /// ```
119 /// use std::error::Error;
120 /// use futures::stream::StreamExt;
121 /// use csv_async::{AsyncReaderBuilder, StringRecord};
122 ///
123 /// # fn main() { async_std::task::block_on(async {example().await}); }
124 /// async fn example() {
125 /// let data = "\
126 /// city;country;pop
127 /// Boston;United States;4628910
128 /// ";
129 /// let mut rdr = AsyncReaderBuilder::new()
130 /// .delimiter(b';')
131 /// .create_reader(data.as_bytes());
132 ///
133 /// let records = rdr
134 /// .records()
135 /// .map(Result::unwrap)
136 /// .collect::<Vec<StringRecord>>().await;
137 /// assert_eq!(records, vec![
138 /// vec!["Boston", "United States", "4628910"],
139 /// ]);
140 /// }
141 /// ```
142 pub fn delimiter(&mut self, delimiter: u8) -> &mut AsyncReaderBuilder {
143 self.builder.delimiter(delimiter);
144 self
145 }
146
147 /// Whether to treat the first row as a special header row.
148 ///
149 /// By default, the first row is treated as a special header row, which
150 /// means the header is never returned by any of the record reading methods
151 /// or iterators. When this is disabled (`yes` set to `false`), the first
152 /// row is not treated specially.
153 ///
154 /// Note that the `headers` and `byte_headers` methods are unaffected by
155 /// whether this is set. Those methods always return the first record.
156 ///
157 /// # Example
158 ///
159 /// This example shows what happens when `has_headers` is disabled.
160 /// Namely, the first row is treated just like any other row.
161 ///
162 /// ```
163 /// use std::error::Error;
164 /// use futures::stream::StreamExt;
165 /// use csv_async::AsyncReaderBuilder;
166 ///
167 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
168 /// async fn example() -> Result<(), Box<dyn Error>> {
169 /// let data = "\
170 /// city,country,pop
171 /// Boston,United States,4628910
172 /// ";
173 /// let mut rdr = AsyncReaderBuilder::new()
174 /// .has_headers(false)
175 /// .create_reader(data.as_bytes());
176 /// let mut iter = rdr.records();
177 ///
178 /// // Read the first record.
179 /// assert_eq!(iter.next().await.unwrap()?, vec!["city", "country", "pop"]);
180 ///
181 /// // Read the second record.
182 /// assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
183 ///
184 /// assert!(iter.next().await.is_none());
185 /// Ok(())
186 /// }
187 /// ```
188 pub fn has_headers(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
189 self.has_headers = yes;
190 self
191 }
192
193 /// Whether the number of fields in records is allowed to change or not.
194 ///
195 /// When disabled (which is the default), parsing CSV data will return an
196 /// error if a record is found with a number of fields different from the
197 /// number of fields in a previous record.
198 ///
199 /// When enabled, this error checking is turned off.
200 ///
201 /// # Example: flexible records enabled
202 ///
203 /// ```
204 /// use std::error::Error;
205 /// use futures::stream::StreamExt;
206 /// use csv_async::AsyncReaderBuilder;
207 ///
208 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
209 /// async fn example() -> Result<(), Box<dyn Error>> {
210 /// // Notice that the first row is missing the population count.
211 /// let data = "\
212 /// city,country,pop
213 /// Boston,United States
214 /// ";
215 /// let mut rdr = AsyncReaderBuilder::new()
216 /// .flexible(true)
217 /// .create_reader(data.as_bytes());
218 /// let mut records = rdr.records();
219 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States"]);
220 /// Ok(())
221 /// }
222 /// ```
223 ///
224 /// # Example: flexible records disabled
225 ///
226 /// This shows the error that appears when records of unequal length
227 /// are found and flexible records have been disabled (which is the
228 /// default).
229 ///
230 /// ```
231 /// use std::error::Error;
232 /// use futures::stream::StreamExt;
233 /// use csv_async::{ErrorKind, AsyncReaderBuilder};
234 ///
235 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
236 /// async fn example() -> Result<(), Box<dyn Error>> {
237 /// // Notice that the first row is missing the population count.
238 /// let data = "\
239 /// city,country,pop
240 /// Boston,United States
241 /// ";
242 /// let mut rdr = AsyncReaderBuilder::new()
243 /// .flexible(false)
244 /// .create_reader(data.as_bytes());
245 ///
246 /// let mut records = rdr.records();
247 /// match records.next().await {
248 /// Some(Err(err)) => match *err.kind() {
249 /// ErrorKind::UnequalLengths { expected_len, len, .. } => {
250 /// // The header row has 3 fields...
251 /// assert_eq!(expected_len, 3);
252 /// // ... but the first row has only 2 fields.
253 /// assert_eq!(len, 2);
254 /// Ok(())
255 /// }
256 /// ref wrong => {
257 /// Err(From::from(format!(
258 /// "expected UnequalLengths error but got {:?}",
259 /// wrong)))
260 /// }
261 /// }
262 /// Some(Ok(rec)) =>
263 /// Err(From::from(format!(
264 /// "expected one errored record but got good record {:?}",
265 /// rec))),
266 /// None =>
267 /// Err(From::from(
268 /// "expected one errored record but got none"))
269 /// }
270 /// }
271 /// ```
272 pub fn flexible(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
273 self.flexible = yes;
274 self
275 }
276
277 /// If set, CSV records' stream will end when first i/o error happens.
278 /// Otherwise CSV reader will continue trying to read from underlying reader.
279 /// For sample, please see unit test `behavior_on_io_errors` in following
280 /// [source file](https://github.com/gwierzchowski/csv-async/blob/master/src/async_readers/ardr_futures.rs).
281 ///
282 /// By default this option is set.
283 pub fn end_on_io_error(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
284 self.end_on_io_error = yes;
285 self
286 }
287
288 /// Whether fields are trimmed of leading and trailing whitespace or not.
289 ///
290 /// By default, no trimming is performed. This method permits one to
291 /// override that behavior and choose one of the following options:
292 ///
293 /// 1. `Trim::Headers` trims only header values.
294 /// 2. `Trim::Fields` trims only non-header or "field" values.
295 /// 3. `Trim::All` trims both header and non-header values.
296 ///
297 /// A value is only interpreted as a header value if this CSV reader is
298 /// configured to read a header record (which is the default).
299 ///
300 /// When reading string records, characters meeting the definition of
301 /// Unicode whitespace are trimmed. When reading byte records, characters
302 /// meeting the definition of ASCII whitespace are trimmed. ASCII
303 /// whitespace characters correspond to the set `[\t\n\v\f\r ]`.
304 ///
305 /// # Example
306 ///
307 /// This example shows what happens when all values are trimmed.
308 ///
309 /// ```
310 /// use std::error::Error;
311 /// use futures::stream::StreamExt;
312 /// use csv_async::{AsyncReaderBuilder, StringRecord, Trim};
313 ///
314 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
315 /// async fn example() -> Result<(), Box<dyn Error>> {
316 /// let data = "\
317 /// city , country , pop
318 /// Boston,\"
319 /// United States\",4628910
320 /// Concord, United States ,42695
321 /// ";
322 /// let mut rdr = AsyncReaderBuilder::new()
323 /// .trim(Trim::All)
324 /// .create_reader(data.as_bytes());
325 /// let records = rdr
326 /// .records()
327 /// .map(Result::unwrap)
328 /// .collect::<Vec<StringRecord>>().await;
329 /// assert_eq!(records, vec![
330 /// vec!["Boston", "United States", "4628910"],
331 /// vec!["Concord", "United States", "42695"],
332 /// ]);
333 /// Ok(())
334 /// }
335 /// ```
336 pub fn trim(&mut self, trim: Trim) -> &mut AsyncReaderBuilder {
337 self.trim = trim;
338 self
339 }
340
341 /// The record terminator to use when parsing CSV.
342 ///
343 /// A record terminator can be any single byte. The default is a special
344 /// value, `Terminator::CRLF`, which treats any occurrence of `\r`, `\n`
345 /// or `\r\n` as a single record terminator.
346 ///
347 /// # Example: `$` as a record terminator
348 ///
349 /// ```
350 /// use std::error::Error;
351 /// use futures::stream::StreamExt;
352 /// use csv_async::{AsyncReaderBuilder, Terminator};
353 ///
354 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
355 /// async fn example() -> Result<(), Box<dyn Error>> {
356 /// let data = "city,country,pop$Boston,United States,4628910";
357 /// let mut rdr = AsyncReaderBuilder::new()
358 /// .terminator(Terminator::Any(b'$'))
359 /// .create_reader(data.as_bytes());
360 /// let mut iter = rdr.records();
361 /// assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
362 /// assert!(iter.next().await.is_none());
363 /// Ok(())
364 /// }
365 /// ```
366 pub fn terminator(&mut self, term: Terminator) -> &mut AsyncReaderBuilder {
367 self.builder.terminator(term.to_core());
368 self
369 }
370
371 /// The quote character to use when parsing CSV.
372 ///
373 /// The default is `b'"'`.
374 ///
375 /// # Example: single quotes instead of double quotes
376 ///
377 /// ```
378 /// use std::error::Error;
379 /// use futures::stream::StreamExt;
380 /// use csv_async::AsyncReaderBuilder;
381 ///
382 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
383 /// async fn example() -> Result<(), Box<dyn Error>> {
384 /// let data = "\
385 /// city,country,pop
386 /// Boston,'United States',4628910
387 /// ";
388 /// let mut rdr = AsyncReaderBuilder::new()
389 /// .quote(b'\'')
390 /// .create_reader(data.as_bytes());
391 /// let mut iter = rdr.records();
392 /// assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
393 /// assert!(iter.next().await.is_none());
394 /// Ok(())
395 /// }
396 /// ```
397 pub fn quote(&mut self, quote: u8) -> &mut AsyncReaderBuilder {
398 self.builder.quote(quote);
399 self
400 }
401
402 /// The escape character to use when parsing CSV.
403 ///
404 /// In some variants of CSV, quotes are escaped using a special escape
405 /// character like `\` (instead of escaping quotes by doubling them).
406 ///
407 /// By default, recognizing these idiosyncratic escapes is disabled.
408 ///
409 /// # Example
410 ///
411 /// ```
412 /// use std::error::Error;
413 /// use futures::stream::StreamExt;
414 /// use csv_async::AsyncReaderBuilder;
415 ///
416 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
417 /// async fn example() -> Result<(), Box<dyn Error>> {
418 /// let data = "\
419 /// city,country,pop
420 /// Boston,\"The \\\"United\\\" States\",4628910
421 /// ";
422 /// let mut rdr = AsyncReaderBuilder::new()
423 /// .escape(Some(b'\\'))
424 /// .create_reader(data.as_bytes());
425 /// let mut records = rdr.records();
426 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "The \"United\" States", "4628910"]);
427 /// Ok(())
428 /// }
429 /// ```
430 pub fn escape(&mut self, escape: Option<u8>) -> &mut AsyncReaderBuilder {
431 self.builder.escape(escape);
432 self
433 }
434
435 /// Enable double quote escapes.
436 ///
437 /// This is enabled by default, but it may be disabled. When disabled,
438 /// doubled quotes are not interpreted as escapes.
439 ///
440 /// # Example
441 ///
442 /// ```
443 /// use std::error::Error;
444 /// use futures::stream::StreamExt;
445 /// use csv_async::AsyncReaderBuilder;
446 ///
447 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
448 /// async fn example() -> Result<(), Box<dyn Error>> {
449 /// let data = "\
450 /// city,country,pop
451 /// Boston,\"The \"\"United\"\" States\",4628910
452 /// ";
453 /// let mut rdr = AsyncReaderBuilder::new()
454 /// .double_quote(false)
455 /// .create_reader(data.as_bytes());
456 /// let mut records = rdr.records();
457 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "The \"United\"\" States\"", "4628910"]);
458 /// Ok(())
459 /// }
460 /// ```
461 pub fn double_quote(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
462 self.builder.double_quote(yes);
463 self
464 }
465
466 /// Enable or disable quoting.
467 ///
468 /// This is enabled by default, but it may be disabled. When disabled,
469 /// quotes are not treated specially.
470 ///
471 /// # Example
472 ///
473 /// ```
474 /// use std::error::Error;
475 /// use futures::stream::StreamExt;
476 /// use csv_async::AsyncReaderBuilder;
477 ///
478 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
479 /// async fn example() -> Result<(), Box<dyn Error>> {
480 /// let data = "\
481 /// city,country,pop
482 /// Boston,\"The United States,4628910
483 /// ";
484 /// let mut rdr = AsyncReaderBuilder::new()
485 /// .quoting(false)
486 /// .create_reader(data.as_bytes());
487 /// let mut records = rdr.records();
488 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "\"The United States", "4628910"]);
489 /// Ok(())
490 /// }
491 /// ```
492 pub fn quoting(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
493 self.builder.quoting(yes);
494 self
495 }
496
497 /// The comment character to use when parsing CSV.
498 ///
499 /// If the start of a record begins with the byte given here, then that
500 /// line is ignored by the CSV parser.
501 ///
502 /// This is disabled by default.
503 ///
504 /// # Example
505 ///
506 /// ```
507 /// use std::error::Error;
508 /// use futures::stream::StreamExt;
509 /// use csv_async::AsyncReaderBuilder;
510 ///
511 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
512 /// async fn example() -> Result<(), Box<dyn Error>> {
513 /// let data = "\
514 /// city,country,pop
515 /// #Concord,United States,42695
516 /// Boston,United States,4628910
517 /// ";
518 /// let mut rdr = AsyncReaderBuilder::new()
519 /// .comment(Some(b'#'))
520 /// .create_reader(data.as_bytes());
521 /// let mut records = rdr.records();
522 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
523 /// assert!(records.next().await.is_none());
524 /// Ok(())
525 /// }
526 /// ```
527 pub fn comment(&mut self, comment: Option<u8>) -> &mut AsyncReaderBuilder {
528 self.builder.comment(comment);
529 self
530 }
531
532 /// A convenience method for specifying a configuration to read ASCII
533 /// delimited text.
534 ///
535 /// This sets the delimiter and record terminator to the ASCII unit
536 /// separator (`\x1F`) and record separator (`\x1E`), respectively.
537 ///
538 /// # Example
539 ///
540 /// ```
541 /// use std::error::Error;
542 /// use futures::stream::StreamExt;
543 /// use csv_async::AsyncReaderBuilder;
544 ///
545 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
546 /// async fn example() -> Result<(), Box<dyn Error>> {
547 /// let data = "\
548 /// city\x1Fcountry\x1Fpop\x1EBoston\x1FUnited States\x1F4628910";
549 /// let mut rdr = AsyncReaderBuilder::new()
550 /// .ascii()
551 /// .create_reader(data.as_bytes());
552 /// let mut records = rdr.byte_records();
553 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
554 /// assert!(records.next().await.is_none());
555 /// Ok(())
556 /// }
557 /// ```
558 pub fn ascii(&mut self) -> &mut AsyncReaderBuilder {
559 self.builder.ascii();
560 self
561 }
562
563 /// Set the capacity (in bytes) of the buffer used in the CSV reader.
564 /// This defaults to a reasonable setting.
565 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut AsyncReaderBuilder {
566 self.capacity = capacity;
567 self
568 }
569
570 /// Enable or disable the NFA for parsing CSV.
571 ///
572 /// This is intended to be a debug option. The NFA is always slower than
573 /// the DFA.
574 #[doc(hidden)]
575 pub fn nfa(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
576 self.builder.nfa(yes);
577 self
578 }
579}
580
581//-//////////////////////////////////////////////////////////////////////////////////////////////
582//-// Reader
583//-//////////////////////////////////////////////////////////////////////////////////////////////
584
585#[derive(Debug)]
586pub struct ReaderState {
587 /// When set, this contains the first row of any parsed CSV data.
588 ///
589 /// This is always populated, regardless of whether `has_headers` is set.
590 headers: Option<Headers>,
591 /// When set, the first row of parsed CSV data is excluded from things
592 /// that read records, like iterators and `read_record`.
593 has_headers: bool,
594 /// When set, there is no restriction on the length of records. When not
595 /// set, every record must have the same number of fields, or else an error
596 /// is reported.
597 flexible: bool,
598 trim: Trim,
599 /// The number of fields in the first record parsed.
600 first_field_count: Option<u64>,
601 /// The current position of the parser.
602 ///
603 /// Note that this position is only observable by callers at the start
604 /// of a record. More granular positions are not supported.
605 cur_pos: Position,
606 /// Whether the first record has been read or not.
607 first: bool,
608 /// Whether the reader has been seek or not.
609 seeked: bool,
610 /// If set, CSV records' stream will end when first i/o error happens.
611 /// Otherwise it will continue trying to read from underlying reader.
612 end_on_io_error: bool,
613 /// IO errors on the underlying reader will be considered as an EOF for
614 /// subsequent read attempts, as it would be incorrect to keep on trying
615 /// to read when the underlying reader has broken.
616 ///
617 /// For clarity, having the best `Debug` impl and in case they need to be
618 /// treated differently at some point, we store whether the `EOF` is
619 /// considered because an actual EOF happened, or because we encountered
620 /// an IO error.
621 /// This has no additional runtime cost.
622 eof: ReaderEofState,
623}
624
625/// Whether EOF of the underlying reader has been reached or not.
626///
627/// IO errors on the underlying reader will be considered as an EOF for
628/// subsequent read attempts, as it would be incorrect to keep on trying
629/// to read when the underlying reader has broken.
630///
631/// For clarity, having the best `Debug` impl and in case they need to be
632/// treated differently at some point, we store whether the `EOF` is
633/// considered because an actual EOF happened, or because we encountered
634/// an IO error
635#[derive(Debug, Clone, Copy, PartialEq, Eq)]
636enum ReaderEofState {
637 NotEof,
638 Eof,
639 IOError,
640}
641
642/// Headers encapsulates any data associated with the headers of CSV data.
643///
644/// The headers always correspond to the first row.
645#[derive(Debug)]
646struct Headers {
647 /// The header, as raw bytes.
648 byte_record: ByteRecord,
649 /// The header, as valid UTF-8 (or a UTF-8 error).
650 string_record: result::Result<StringRecord, Utf8Error>,
651}
652
653impl ReaderState {
654 #[inline(always)]
655 fn add_record(&mut self, record: &ByteRecord) -> Result<()> {
656 let i = self.cur_pos.record();
657 self.cur_pos.set_record(i.checked_add(1).unwrap());
658 if !self.flexible {
659 match self.first_field_count {
660 None => self.first_field_count = Some(record.len() as u64),
661 Some(expected) => {
662 if record.len() as u64 != expected {
663 return Err(Error::new(ErrorKind::UnequalLengths {
664 pos: record.position().map(Clone::clone),
665 expected_len: expected,
666 len: record.len() as u64,
667 }));
668 }
669 }
670 }
671 }
672 Ok(())
673 }
674}
675/// CSV async reader internal implementation used by both record reader and deserializer.
676///
677#[derive(Debug)]
678pub struct AsyncReaderImpl<R> {
679 /// The underlying CSV parser.
680 ///
681 /// We explicitly put this on the heap because CoreReader embeds an entire
682 /// DFA transition table, which along with other things, tallies up to
683 /// almost 500 bytes on the stack.
684 core: Box<CoreReader>,
685 /// The underlying reader.
686 rdr: io::BufReader<R>,
687 /// Various state tracking.
688 ///
689 /// There is more state embedded in the `CoreReader`.
690 state: ReaderState,
691}
692
693#[must_use = "futures do nothing unless you `.await` or poll them"]
694struct FillBuf<'a, R: AsyncBufRead + ?Sized> {
695 reader: &'a mut R,
696}
697
698impl<R: AsyncBufRead + ?Sized + Unpin> Unpin for FillBuf<'_, R> {}
699
700impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> {
701 pub fn new(reader: &'a mut R) -> Self {
702 Self { reader }
703 }
704}
705
706impl<R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'_, R> {
707 type Output = io::Result<usize>;
708
709 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
710 match Pin::new(&mut *self.reader).poll_fill_buf(cx) {
711 Poll::Ready(res) => {
712 match res {
713 Ok(res) => Poll::Ready(Ok(res.len())),
714 Err(e) => Poll::Ready(Err(e))
715 }
716 },
717 Poll::Pending => Poll::Pending
718 }
719 }
720}
721
722impl<'r, R> AsyncReaderImpl<R>
723where
724 R: io::AsyncRead + Unpin + 'r,
725{
726 /// Create a new CSV reader given a builder and a source of underlying
727 /// bytes.
728 fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncReaderImpl<R> {
729 AsyncReaderImpl {
730 core: Box::new(builder.builder.build()),
731 rdr: io::BufReader::with_capacity(builder.capacity, rdr),
732 state: ReaderState {
733 headers: None,
734 has_headers: builder.has_headers,
735 flexible: builder.flexible,
736 trim: builder.trim,
737 end_on_io_error: builder.end_on_io_error,
738 first_field_count: None,
739 cur_pos: Position::new(),
740 first: false,
741 seeked: false,
742 eof: ReaderEofState::NotEof,
743 },
744 }
745 }
746
747 /// Returns a reference to the first row read by this parser.
748 ///
749 pub async fn headers(&mut self) -> Result<&StringRecord> {
750 if self.state.headers.is_none() {
751 let mut record = ByteRecord::new();
752 self.read_byte_record_impl(&mut record).await?;
753 self.set_headers_impl(Err(record));
754 }
755 let headers = self.state.headers.as_ref().unwrap();
756 match headers.string_record {
757 Ok(ref record) => Ok(record),
758 Err(ref err) => Err(Error::new(ErrorKind::Utf8 {
759 pos: headers.byte_record.position().map(Clone::clone),
760 err: err.clone(),
761 })),
762 }
763 }
764
765 /// Returns a reference to the first row read by this parser as raw bytes.
766 ///
767 pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
768 if self.state.headers.is_none() {
769 let mut record = ByteRecord::new();
770 self.read_byte_record_impl(&mut record).await?;
771 self.set_headers_impl(Err(record));
772 }
773 Ok(&self.state.headers.as_ref().unwrap().byte_record)
774 }
775
776 /// Set the headers of this CSV parser manually.
777 ///
778 pub fn set_headers(&mut self, headers: StringRecord) {
779 self.set_headers_impl(Ok(headers));
780 }
781
782 /// Set the headers of this CSV parser manually as raw bytes.
783 ///
784 pub fn set_byte_headers(&mut self, headers: ByteRecord) {
785 self.set_headers_impl(Err(headers));
786 }
787
788 fn set_headers_impl(
789 &mut self,
790 headers: result::Result<StringRecord, ByteRecord>,
791 ) {
792 // If we have string headers, then get byte headers. But if we have
793 // byte headers, then get the string headers (or a UTF-8 error).
794 let (mut str_headers, mut byte_headers) = match headers {
795 Ok(string) => {
796 let bytes = string.clone().into_byte_record();
797 (Ok(string), bytes)
798 }
799 Err(bytes) => {
800 match StringRecord::from_byte_record(bytes.clone()) {
801 Ok(str_headers) => (Ok(str_headers), bytes),
802 Err(err) => (Err(err.utf8_error().clone()), bytes),
803 }
804 }
805 };
806 if self.state.trim.should_trim_headers() {
807 if let Ok(ref mut str_headers) = str_headers.as_mut() {
808 str_headers.trim();
809 }
810 byte_headers.trim();
811 }
812 self.state.headers = Some(Headers {
813 byte_record: byte_headers,
814 string_record: str_headers,
815 });
816 }
817
818 /// Read a single row into the given record. Returns false when no more
819 /// records could be read.
820 pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
821 let result = record.read(self).await;
822 // We need to trim again because trimming string records includes
823 // Unicode whitespace. (ByteRecord trimming only includes ASCII
824 // whitespace.)
825 if self.state.trim.should_trim_fields() {
826 record.trim();
827 }
828 result
829 }
830
831 /// Read a single row into the given byte record. Returns false when no
832 /// more records could be read.
833 pub async fn read_byte_record(
834 &mut self,
835 record: &mut ByteRecord,
836 ) -> Result<bool> {
837 if !self.state.seeked && !self.state.has_headers && !self.state.first {
838 // If the caller indicated "no headers" and we haven't yielded the
839 // first record yet, then we should yield our header row if we have
840 // one.
841 if let Some(ref headers) = self.state.headers {
842 self.state.first = true;
843 record.clone_from(&headers.byte_record);
844 if self.state.trim.should_trim_fields() {
845 record.trim();
846 }
847 return Ok(!record.is_empty());
848 }
849 }
850 let ok = self.read_byte_record_impl(record).await?;
851 self.state.first = true;
852 if !self.state.seeked && self.state.headers.is_none() {
853 self.set_headers_impl(Err(record.clone()));
854 // If the end user indicated that we have headers, then we should
855 // never return the first row. Instead, we should attempt to
856 // read and return the next one.
857 if self.state.has_headers {
858 let result = self.read_byte_record_impl(record).await;
859 if self.state.trim.should_trim_fields() {
860 record.trim();
861 }
862 return result;
863 }
864 } else if self.state.trim.should_trim_fields() {
865 record.trim();
866 }
867 Ok(ok)
868 }
869
870 /// Read a byte record from the underlying CSV reader, without accounting
871 /// for headers.
872 #[inline(always)]
873 async fn read_byte_record_impl(
874 &mut self,
875 record: &mut ByteRecord,
876 ) -> Result<bool> {
877 use csv_core::ReadRecordResult::*;
878
879 record.clear();
880 record.set_position(Some(self.state.cur_pos.clone()));
881 match self.state.eof {
882 ReaderEofState::Eof => return Ok(false),
883 ReaderEofState::IOError => {
884 if self.state.end_on_io_error { return Ok(false) }
885 },
886 ReaderEofState::NotEof => {}
887 }
888 let (mut outlen, mut endlen) = (0, 0);
889 loop {
890 let (res, nin, nout, nend) = {
891 if let Err(err) = FillBuf::new(&mut self.rdr).await {
892 self.state.eof = ReaderEofState::IOError;
893 return Err(err.into());
894 }
895 let (fields, ends) = record.as_parts();
896 self.core.read_record(
897 self.rdr.buffer(),
898 &mut fields[outlen..],
899 &mut ends[endlen..],
900 )
901 };
902 Pin::new(&mut self.rdr).consume(nin);
903 let byte = self.state.cur_pos.byte();
904 self.state
905 .cur_pos
906 .set_byte(byte + nin as u64)
907 .set_line(self.core.line());
908 outlen += nout;
909 endlen += nend;
910 match res {
911 InputEmpty => continue,
912 OutputFull => {
913 record.expand_fields();
914 continue;
915 }
916 OutputEndsFull => {
917 record.expand_ends();
918 continue;
919 }
920 Record => {
921 record.set_len(endlen);
922 self.state.add_record(record)?;
923 return Ok(true);
924 }
925 End => {
926 self.state.eof = ReaderEofState::Eof;
927 return Ok(false);
928 }
929 }
930 }
931 }
932
933 /// Return the current position of this CSV reader.
934 ///
935 #[inline]
936 pub fn position(&self) -> &Position {
937 &self.state.cur_pos
938 }
939
940 /// Returns true if and only if this reader has been exhausted.
941 ///
942 pub fn is_done(&self) -> bool {
943 self.state.eof != ReaderEofState::NotEof
944 }
945
946 /// Returns true if and only if this reader has been configured to
947 /// interpret the first record as a header record.
948 pub fn has_headers(&self) -> bool {
949 self.state.has_headers
950 }
951
952 /// Returns a reference to the underlying reader.
953 pub fn get_ref(&self) -> &R {
954 self.rdr.get_ref()
955 }
956
957 /// Returns a mutable reference to the underlying reader.
958 pub fn get_mut(&mut self) -> &mut R {
959 self.rdr.get_mut()
960 }
961
962 /// Unwraps this CSV reader, returning the underlying reader.
963 ///
964 /// Note that any leftover data inside this reader's internal buffer is
965 /// lost.
966 pub fn into_inner(self) -> R {
967 self.rdr.into_inner()
968 }
969}
970
971impl<R: io::AsyncRead + io::AsyncSeek + Unpin> AsyncReaderImpl<R> {
972 /// Seeks the underlying reader to the position given.
973 ///
974 pub async fn seek(&mut self, pos: Position) -> Result<()> {
975 self.byte_headers().await?;
976 self.state.seeked = true;
977 if pos.byte() == self.state.cur_pos.byte() {
978 return Ok(());
979 }
980 self.rdr.seek(io::SeekFrom::Start(pos.byte())).await?;
981 self.core.reset();
982 self.core.set_line(pos.line());
983 self.state.cur_pos = pos;
984 self.state.eof = ReaderEofState::NotEof;
985 Ok(())
986 }
987
988 /// This is like `seek`, but provides direct control over how the seeking
989 /// operation is performed via `io::SeekFrom`.
990 pub async fn seek_raw(
991 &mut self,
992 seek_from: io::SeekFrom,
993 pos: Position,
994 ) -> Result<()> {
995 self.byte_headers().await?;
996 self.state.seeked = true;
997 self.rdr.seek(seek_from).await?;
998 self.core.reset();
999 self.core.set_line(pos.line());
1000 self.state.cur_pos = pos;
1001 self.state.eof = ReaderEofState::NotEof;
1002 Ok(())
1003 }
1004
1005 /// Seeks the underlying reader to first data record.
1006 ///
1007 #[cfg(feature = "tokio")]
1008 pub async fn rewind(&mut self) -> Result<()> {
1009 self.byte_headers().await?;
1010 self.state.seeked = false;
1011 self.state.headers = None;
1012 self.state.first = false;
1013 if self.state.cur_pos.byte() == 0 {
1014 return Ok(());
1015 }
1016 self.rdr.rewind().await?;
1017 self.core.reset();
1018 self.core.set_line(1);
1019 self.state.cur_pos.set_byte(0).set_line(1).set_record(0);
1020 self.state.eof = ReaderEofState::NotEof;
1021 Ok(())
1022 }
1023 #[cfg(not(feature = "tokio"))]
1024 pub async fn rewind(&mut self) -> Result<()> {
1025 self.byte_headers().await?;
1026 self.state.seeked = false;
1027 self.state.headers = None;
1028 self.state.first = false;
1029 if self.state.cur_pos.byte() == 0 {
1030 return Ok(());
1031 }
1032 self.rdr.seek(io::SeekFrom::Start(0)).await?;
1033 self.core.reset();
1034 self.core.set_line(1);
1035 self.state.cur_pos.set_byte(0).set_line(1).set_record(0);
1036 self.state.eof = ReaderEofState::NotEof;
1037 Ok(())
1038 }
1039}
1040
1041
1042//-//////////////////////////////////////////////////////////////////////////////////////////////
1043//-//////////////////////////////////////////////////////////////////////////////////////////////
1044
1045async fn read_record_borrowed<R>(
1046 rdr: &mut AsyncReaderImpl<R>,
1047 mut rec: StringRecord,
1048) -> (Option<Result<StringRecord>>, &mut AsyncReaderImpl<R>, StringRecord)
1049where
1050 R: io::AsyncRead + Unpin
1051{
1052 let result = match rdr.read_record(&mut rec).await {
1053 Err(err) => Some(Err(err)),
1054 Ok(true) => Some(Ok(rec.clone())),
1055 Ok(false) => None,
1056 };
1057
1058 (result, rdr, rec)
1059}
1060
1061/// A borrowed stream of records as strings.
1062///
1063/// The lifetime parameter `'r` refers to the lifetime of the underlying
1064/// CSV `Reader`.
1065#[allow(clippy::type_complexity)]
1066pub struct StringRecordsStream<'r, R>
1067where
1068 R: io::AsyncRead + Unpin + Send
1069{
1070 fut: Option<
1071 Pin<
1072 Box<
1073 dyn Future<
1074 Output = (
1075 Option<Result<StringRecord>>,
1076 &'r mut AsyncReaderImpl<R>,
1077 StringRecord,
1078 ),
1079 > + Send + 'r,
1080 >,
1081 >,
1082 >,
1083}
1084
1085impl<'r, R> StringRecordsStream<'r, R>
1086where
1087 R: io::AsyncRead + Unpin + Send
1088{
1089 fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
1090 Self {
1091 fut: Some(Pin::from(Box::new(read_record_borrowed(
1092 rdr,
1093 StringRecord::new(),
1094 )))),
1095 }
1096 }
1097}
1098
1099impl<'r, R> Stream for StringRecordsStream<'r, R>
1100where
1101 R: io::AsyncRead + Unpin + Send
1102{
1103 type Item = Result<StringRecord>;
1104
1105 fn poll_next(
1106 mut self: Pin<&mut Self>,
1107 cx: &mut Context,
1108 ) -> Poll<Option<Self::Item>> {
1109 if let Some(fut) = self.fut.as_mut() {
1110 match fut.as_mut().poll(cx) {
1111 Poll::Ready((result, rdr, rec)) => {
1112 if result.is_some() {
1113 self.fut =
1114 Some(Pin::from(Box::new(read_record_borrowed(rdr, rec))));
1115 } else {
1116 self.fut = None;
1117 }
1118 Poll::Ready(result)
1119 }
1120 Poll::Pending => Poll::Pending,
1121 }
1122 } else {
1123 Poll::Ready(None)
1124 }
1125 }
1126}
1127
1128//-//////////////////////////////////////////////////////////////////////////////////////////////
1129//-//////////////////////////////////////////////////////////////////////////////////////////////
1130
1131async fn read_record<R>(
1132 mut rdr: AsyncReaderImpl<R>,
1133 mut rec: StringRecord,
1134) -> (Option<Result<StringRecord>>, AsyncReaderImpl<R>, StringRecord)
1135where
1136 R: io::AsyncRead + Unpin
1137{
1138 let result = match rdr.read_record(&mut rec).await {
1139 Err(err) => Some(Err(err)),
1140 Ok(true) => Some(Ok(rec.clone())),
1141 Ok(false) => None,
1142 };
1143
1144 (result, rdr, rec)
1145}
1146
1147/// An owned stream of records as strings.
1148#[allow(clippy::type_complexity)]
1149pub struct StringRecordsIntoStream<'r, R>
1150where
1151 R: io::AsyncRead + Unpin + Send
1152{
1153 fut: Option<
1154 Pin<
1155 Box<
1156 dyn Future<
1157 Output = (
1158 Option<Result<StringRecord>>,
1159 AsyncReaderImpl<R>,
1160 StringRecord,
1161 ),
1162 > + Send + 'r,
1163 >,
1164 >,
1165 >,
1166}
1167
1168impl<'r, R> StringRecordsIntoStream<'r, R>
1169where
1170 R: io::AsyncRead + Unpin + Send + 'r
1171{
1172 fn new(rdr: AsyncReaderImpl<R>) -> Self {
1173 Self {
1174 fut: Some(Pin::from(Box::new(read_record(
1175 rdr,
1176 StringRecord::new(),
1177 )))),
1178 }
1179 }
1180}
1181
1182impl<'r, R> Stream for StringRecordsIntoStream<'r, R>
1183where
1184 R: io::AsyncRead + Unpin + Send + 'r
1185{
1186 type Item = Result<StringRecord>;
1187
1188 fn poll_next(
1189 mut self: Pin<&mut Self>,
1190 cx: &mut Context,
1191 ) -> Poll<Option<Self::Item>> {
1192 if let Some(fut) = self.fut.as_mut() {
1193 match fut.as_mut().poll(cx) {
1194 Poll::Ready((result, rdr, rec)) => {
1195 if result.is_some() {
1196 self.fut =
1197 Some(Pin::from(Box::new(read_record(rdr, rec))));
1198 } else {
1199 self.fut = None;
1200 }
1201
1202 Poll::Ready(result)
1203 }
1204 Poll::Pending => Poll::Pending,
1205 }
1206 } else {
1207 Poll::Ready(None)
1208 }
1209 }
1210}
1211
1212//-//////////////////////////////////////////////////////////////////////////////////////////////
1213//-//////////////////////////////////////////////////////////////////////////////////////////////
1214
1215async fn read_byte_record_borrowed<R>(
1216 rdr: &mut AsyncReaderImpl<R>,
1217 mut rec: ByteRecord,
1218) -> (Option<Result<ByteRecord>>, &mut AsyncReaderImpl<R>, ByteRecord)
1219where
1220 R: io::AsyncRead + Unpin,
1221{
1222 let result = match rdr.read_byte_record(&mut rec).await {
1223 Err(err) => Some(Err(err)),
1224 Ok(true) => Some(Ok(rec.clone())),
1225 Ok(false) => None,
1226 };
1227
1228 (result, rdr, rec)
1229}
1230
1231/// A borrowed stream of records as raw bytes.
1232///
1233/// The lifetime parameter `'r` refers to the lifetime of the underlying
1234/// CSV `Reader`.
1235#[allow(clippy::type_complexity)]
1236pub struct ByteRecordsStream<'r, R>
1237where
1238 R: io::AsyncRead + Unpin + Send,
1239{
1240 fut: Option<
1241 Pin<
1242 Box<
1243 dyn Future<
1244 Output = (
1245 Option<Result<ByteRecord>>,
1246 &'r mut AsyncReaderImpl<R>,
1247 ByteRecord,
1248 ),
1249 > + Send + 'r,
1250 >,
1251 >,
1252 >,
1253}
1254
1255impl<'r, R> ByteRecordsStream<'r, R>
1256where
1257 R: io::AsyncRead + Unpin + Send + 'r,
1258{
1259 fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
1260 Self {
1261 fut: Some(Pin::from(Box::new(read_byte_record_borrowed(
1262 rdr,
1263 ByteRecord::new(),
1264 )))),
1265 }
1266 }
1267}
1268
1269impl<'r, R> Stream for ByteRecordsStream<'r, R>
1270where
1271 R: io::AsyncRead + Send + Unpin,
1272{
1273 type Item = Result<ByteRecord>;
1274
1275 fn poll_next(
1276 mut self: Pin<&mut Self>,
1277 cx: &mut Context,
1278 ) -> Poll<Option<Self::Item>> {
1279 if let Some(fut) = self.fut.as_mut() {
1280 match fut.as_mut().poll(cx) {
1281 Poll::Ready((result, rdr, rec)) => {
1282 if result.is_some() {
1283 self.fut =
1284 Some(Pin::from(Box::new(read_byte_record_borrowed(rdr, rec))));
1285 } else {
1286 self.fut = None;
1287 }
1288 Poll::Ready(result)
1289 }
1290 Poll::Pending => Poll::Pending,
1291 }
1292 } else {
1293 Poll::Ready(None)
1294 }
1295 }
1296}
1297
1298//-//////////////////////////////////////////////////////////////////////////////////////////////
1299//-//////////////////////////////////////////////////////////////////////////////////////////////
1300
1301async fn read_byte_record<R>(
1302 mut rdr: AsyncReaderImpl<R>,
1303 mut rec: ByteRecord,
1304) -> (Option<Result<ByteRecord>>, AsyncReaderImpl<R>, ByteRecord)
1305where
1306 R: io::AsyncRead + Unpin
1307{
1308 let result = match rdr.read_byte_record(&mut rec).await {
1309 Err(err) => Some(Err(err)),
1310 Ok(true) => Some(Ok(rec.clone())),
1311 Ok(false) => None,
1312 };
1313
1314 (result, rdr, rec)
1315}
1316
1317/// An owned stream of records as raw bytes.
1318#[allow(clippy::type_complexity)]
1319pub struct ByteRecordsIntoStream<'r, R>
1320where
1321 R: io::AsyncRead + Unpin + Send
1322{
1323 fut: Option<
1324 Pin<
1325 Box<
1326 dyn Future<
1327 Output = (
1328 Option<Result<ByteRecord>>,
1329 AsyncReaderImpl<R>,
1330 ByteRecord,
1331 ),
1332 > + Send + 'r,
1333 >,
1334 >,
1335 >,
1336}
1337
1338impl<'r, R> ByteRecordsIntoStream<'r, R>
1339where
1340 R: io::AsyncRead + Send + Unpin + 'r
1341{
1342 fn new(rdr: AsyncReaderImpl<R>) -> Self {
1343 Self {
1344 fut: Some(Pin::from(Box::new(read_byte_record(
1345 rdr,
1346 ByteRecord::new(),
1347 )))),
1348 }
1349 }
1350}
1351
1352impl<'r, R> Stream for ByteRecordsIntoStream<'r, R>
1353where
1354 R: io::AsyncRead + Send + Unpin + 'r
1355{
1356 type Item = Result<ByteRecord>;
1357
1358 fn poll_next(
1359 mut self: Pin<&mut Self>,
1360 cx: &mut Context,
1361 ) -> Poll<Option<Self::Item>> {
1362 if let Some(fut) = self.fut.as_mut() {
1363 match fut.as_mut().poll(cx) {
1364 Poll::Ready((result, rdr, rec)) => {
1365 if result.is_some() {
1366 self.fut =
1367 Some(Pin::from(Box::new(read_byte_record(rdr, rec))));
1368 } else {
1369 self.fut = None;
1370 }
1371 Poll::Ready(result)
1372 }
1373 Poll::Pending => Poll::Pending,
1374 }
1375 } else {
1376 Poll::Ready(None)
1377 }
1378 }
1379}
1380
1381//-//////////////////////////////////////////////////////////////////////////////////////////////
1382//-//////////////////////////////////////////////////////////////////////////////////////////////
1383
1384cfg_if::cfg_if! {
1385if #[cfg(feature = "with_serde")] {
1386
1387async fn deserialize_record_borrowed<R, D: DeserializeOwned>(
1388 rdr: &mut AsyncReaderImpl<R>,
1389 headers: Option<StringRecord>,
1390 mut rec: StringRecord,
1391) -> (Option<Result<D>>, &mut AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
1392where
1393 R: io::AsyncRead + Unpin
1394{
1395 let result = match rdr.read_record(&mut rec).await {
1396 Err(err) => Some(Err(err)),
1397 Ok(true) => Some(rec.deserialize(headers.as_ref())),
1398 Ok(false) => None,
1399 };
1400
1401 (result, rdr, headers, rec)
1402}
1403
1404/// A borrowed stream of deserialized records.
1405///
1406/// The lifetime parameter `'r` refers to the lifetime of the underlying CSV `Reader`.
1407/// type, and `D` refers to the type that this stream will deserialize a record into.
1408#[allow(clippy::type_complexity)]
1409pub struct DeserializeRecordsStream<'r, R, D>
1410where
1411 R: io::AsyncRead + Unpin + Send
1412{
1413 header_fut: Option<
1414 Pin<
1415 Box<
1416 dyn Future<
1417 Output = (
1418 Result<StringRecord>,
1419 &'r mut AsyncReaderImpl<R>,
1420 )
1421 > + Send + 'r,
1422 >,
1423 >,
1424 >,
1425 rec_fut: Option<
1426 Pin<
1427 Box<
1428 dyn Future<
1429 Output = (
1430 Option<Result<D>>,
1431 &'r mut AsyncReaderImpl<R>,
1432 Option<StringRecord>,
1433 StringRecord,
1434 )
1435 > + Send + 'r,
1436 >,
1437 >,
1438 >,
1439}
1440
1441impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsStream<'r, R, D>
1442where
1443 R: io::AsyncRead + Unpin + Send
1444{
1445 fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
1446 let has_headers = rdr.has_headers();
1447 if has_headers {
1448 Self {
1449 header_fut: Some(Pin::from(Box::new(
1450 async{ (rdr.headers().await.cloned(), rdr) }
1451 ))),
1452 rec_fut: None,
1453 }
1454 } else {
1455 Self {
1456 header_fut: None,
1457 rec_fut: Some(Pin::from(Box::new(
1458 deserialize_record_borrowed(rdr, None, StringRecord::new())
1459 ))),
1460 }
1461 }
1462 }
1463}
1464
1465impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsStream<'r, R, D>
1466where
1467 R: io::AsyncRead + Unpin + Send
1468{
1469 type Item = Result<D>;
1470
1471 fn poll_next(
1472 mut self: Pin<&mut Self>,
1473 cx: &mut Context,
1474 ) -> Poll<Option<Self::Item>> {
1475 if let Some(header_fut) = &mut self.header_fut {
1476 match header_fut.as_mut().poll(cx) {
1477 Poll::Ready((Ok(headers), rdr)) => {
1478 self.header_fut = None;
1479 self.rec_fut = Some(Pin::from(Box::new(
1480 deserialize_record_borrowed(rdr, Some(headers), StringRecord::new()),
1481 )));
1482 cx.waker().wake_by_ref();
1483 Poll::Pending
1484 },
1485 Poll::Ready((Err(err), rdr)) => {
1486 self.header_fut = None;
1487 self.rec_fut = Some(Pin::from(Box::new(
1488 deserialize_record_borrowed(rdr, None, StringRecord::new()),
1489 )));
1490 Poll::Ready(Some(Err(err)))
1491 },
1492 Poll::Pending => Poll::Pending,
1493 }
1494 } else if let Some(fut) = self.rec_fut.as_mut() {
1495 match fut.as_mut().poll(cx) {
1496 Poll::Ready((result, rdr, headers, rec)) => {
1497 if result.is_some() {
1498 self.rec_fut = Some(Pin::from(Box::new(
1499 deserialize_record_borrowed(rdr, headers, rec),
1500 )));
1501 } else {
1502 self.rec_fut = None;
1503 }
1504 Poll::Ready(result)
1505 }
1506 Poll::Pending => Poll::Pending,
1507 }
1508 } else {
1509 Poll::Ready(None)
1510 }
1511 }
1512}
1513
1514//-//////////////////////////////////////////////////////////////////////////////////////////////
1515//-//////////////////////////////////////////////////////////////////////////////////////////////
1516
1517async fn deserialize_record_with_pos_borrowed<R, D: DeserializeOwned>(
1518 rdr: &mut AsyncReaderImpl<R>,
1519 headers: Option<StringRecord>,
1520 mut rec: StringRecord,
1521) -> (Option<Result<D>>, Position, &mut AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
1522where
1523 R: io::AsyncRead + Unpin
1524{
1525 let pos = rdr.position().clone();
1526 let result = match rdr.read_record(&mut rec).await {
1527 Err(err) => Some(Err(err)),
1528 Ok(true) => Some(rec.deserialize(headers.as_ref())),
1529 Ok(false) => None,
1530 };
1531
1532 (result, pos, rdr, headers, rec)
1533}
1534
1535/// A borrowed stream of pairs: deserialized records and position in stream before reading record.
1536///
1537/// The lifetime parameter `'r` refers to the lifetime of the underlying CSV `Reader`.
1538/// type, and `D` refers to the type that this stream will deserialize a record into.
1539#[allow(clippy::type_complexity)]
1540pub struct DeserializeRecordsStreamPos<'r, R, D>
1541where
1542 R: io::AsyncRead + Unpin + Send
1543{
1544 header_fut: Option<
1545 Pin<
1546 Box<
1547 dyn Future<
1548 Output = (
1549 Result<StringRecord>,
1550 &'r mut AsyncReaderImpl<R>,
1551 )
1552 > + Send + 'r,
1553 >,
1554 >,
1555 >,
1556 rec_fut: Option<
1557 Pin<
1558 Box<
1559 dyn Future<
1560 Output = (
1561 Option<Result<D>>,
1562 Position,
1563 &'r mut AsyncReaderImpl<R>,
1564 Option<StringRecord>,
1565 StringRecord,
1566 )
1567 > + Send + 'r,
1568 >,
1569 >,
1570 >,
1571}
1572
1573impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsStreamPos<'r, R, D>
1574where
1575 R: io::AsyncRead + Unpin + Send
1576{
1577 fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
1578 let has_headers = rdr.has_headers();
1579 if has_headers {
1580 Self {
1581 header_fut: Some(Pin::from(Box::new(
1582 async{ (rdr.headers().await.cloned(), rdr) }
1583 ))),
1584 rec_fut: None,
1585 }
1586 } else {
1587 Self {
1588 header_fut: None,
1589 rec_fut: Some(Pin::from(Box::new(
1590 deserialize_record_with_pos_borrowed(rdr, None, StringRecord::new())
1591 ))),
1592 }
1593 }
1594 }
1595}
1596
1597impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsStreamPos<'r, R, D>
1598where
1599 R: io::AsyncRead + Unpin + Send
1600{
1601 type Item = (Result<D>, Position);
1602
1603 fn poll_next(
1604 mut self: Pin<&mut Self>,
1605 cx: &mut Context,
1606 ) -> Poll<Option<Self::Item>> {
1607 if let Some(header_fut) = &mut self.header_fut {
1608 match header_fut.as_mut().poll(cx) {
1609 Poll::Ready((Ok(headers), rdr)) => {
1610 self.header_fut = None;
1611 self.rec_fut = Some(Pin::from(Box::new(
1612 deserialize_record_with_pos_borrowed(rdr, Some(headers), StringRecord::new()),
1613 )));
1614 cx.waker().wake_by_ref();
1615 Poll::Pending
1616 },
1617 Poll::Ready((Err(err), rdr)) => {
1618 self.header_fut = None;
1619 let pos = rdr.position().clone();
1620 self.rec_fut = Some(Pin::from(Box::new(
1621 deserialize_record_with_pos_borrowed(rdr, None, StringRecord::new()),
1622 )));
1623 Poll::Ready(Some((Err(err), pos)))
1624 },
1625 Poll::Pending => Poll::Pending,
1626 }
1627 } else if let Some(fut) = self.rec_fut.as_mut() {
1628 match fut.as_mut().poll(cx) {
1629 Poll::Ready((result, pos, rdr, headers, rec)) => {
1630 if let Some(result) = result {
1631 self.rec_fut = Some(Pin::from(Box::new(
1632 deserialize_record_with_pos_borrowed(rdr, headers, rec),
1633 )));
1634 Poll::Ready(Some((result, pos)))
1635 } else {
1636 self.rec_fut = None;
1637 Poll::Ready(None)
1638 }
1639 }
1640 Poll::Pending => Poll::Pending,
1641 }
1642 } else {
1643 Poll::Ready(None)
1644 }
1645 }
1646}
1647
1648//-//////////////////////////////////////////////////////////////////////////////////////////////
1649//-//////////////////////////////////////////////////////////////////////////////////////////////
1650
1651async fn deserialize_record<R, D: DeserializeOwned>(
1652 mut rdr: AsyncReaderImpl<R>,
1653 headers: Option<StringRecord>,
1654 mut rec: StringRecord,
1655) -> (Option<Result<D>>, AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
1656where
1657 R: io::AsyncRead + Unpin
1658{
1659 let result = match rdr.read_record(&mut rec).await {
1660 Err(err) => Some(Err(err)),
1661 Ok(true) => Some(rec.deserialize(headers.as_ref())),
1662 Ok(false) => None,
1663 };
1664
1665 (result, rdr, headers, rec)
1666}
1667
1668/// A owned stream of deserialized records.
1669///
1670/// The lifetime parameter `'r` refers to the lifetime of the underlying CSV `Reader`.
1671/// type, and `D` refers to the type that this stream will deserialize a record into.
1672#[allow(clippy::type_complexity)]
1673pub struct DeserializeRecordsIntoStream<'r, R, D>
1674where
1675 R: io::AsyncRead + Unpin + Send
1676{
1677 header_fut: Option<
1678 Pin<
1679 Box<
1680 dyn Future<
1681 Output = (
1682 Result<StringRecord>,
1683 AsyncReaderImpl<R>,
1684 )
1685 > + Send + 'r,
1686 >,
1687 >,
1688 >,
1689 rec_fut: Option<
1690 Pin<
1691 Box<
1692 dyn Future<
1693 Output = (
1694 Option<Result<D>>,
1695 AsyncReaderImpl<R>,
1696 Option<StringRecord>,
1697 StringRecord,
1698 )
1699 > + Send + 'r,
1700 >,
1701 >,
1702 >,
1703}
1704
1705impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsIntoStream<'r, R, D>
1706where
1707 R: io::AsyncRead + Unpin + Send + 'r
1708{
1709 fn new(mut rdr: AsyncReaderImpl<R>) -> Self {
1710 let has_headers = rdr.has_headers();
1711 if has_headers {
1712 Self {
1713 header_fut: Some(Pin::from(Box::new(
1714 async{ (rdr.headers().await.cloned(), rdr) }
1715 ))),
1716 rec_fut: None,
1717 }
1718 } else {
1719 Self {
1720 header_fut: None,
1721 rec_fut: Some(Pin::from(Box::new(
1722 deserialize_record(rdr, None, StringRecord::new())
1723 ))),
1724 }
1725 }
1726 }
1727}
1728
1729impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsIntoStream<'r, R, D>
1730where
1731 R: io::AsyncRead + Unpin + Send + 'r
1732{
1733 type Item = Result<D>;
1734
1735 fn poll_next(
1736 mut self: Pin<&mut Self>,
1737 cx: &mut Context,
1738 ) -> Poll<Option<Self::Item>> {
1739 if let Some(header_fut) = &mut self.header_fut {
1740 match header_fut.as_mut().poll(cx) {
1741 Poll::Ready((Ok(headers), rdr)) => {
1742 self.header_fut = None;
1743 self.rec_fut = Some(Pin::from(Box::new(
1744 deserialize_record(rdr, Some(headers), StringRecord::new()),
1745 )));
1746 cx.waker().wake_by_ref();
1747 Poll::Pending
1748 },
1749 Poll::Ready((Err(err), rdr)) => {
1750 self.header_fut = None;
1751 self.rec_fut = Some(Pin::from(Box::new(
1752 deserialize_record(rdr, None, StringRecord::new()),
1753 )));
1754 Poll::Ready(Some(Err(err)))
1755 },
1756 Poll::Pending => Poll::Pending,
1757 }
1758 } else if let Some(fut) = self.rec_fut.as_mut() {
1759 match fut.as_mut().poll(cx) {
1760 Poll::Ready((result, rdr, headers, rec)) => {
1761 if result.is_some() {
1762 self.rec_fut = Some(Pin::from(Box::new(
1763 deserialize_record(rdr, headers, rec),
1764 )));
1765 } else {
1766 self.rec_fut = None;
1767 }
1768 Poll::Ready(result)
1769 }
1770 Poll::Pending => Poll::Pending,
1771 }
1772 } else {
1773 Poll::Ready(None)
1774 }
1775 }
1776}
1777
1778//-//////////////////////////////////////////////////////////////////////////////////////////////
1779//-//////////////////////////////////////////////////////////////////////////////////////////////
1780
1781async fn deserialize_record_with_pos<R, D: DeserializeOwned>(
1782 mut rdr: AsyncReaderImpl<R>,
1783 headers: Option<StringRecord>,
1784 mut rec: StringRecord,
1785) -> (Option<Result<D>>, Position, AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
1786where
1787 R: io::AsyncRead + Unpin
1788{
1789 let pos = rdr.position().clone();
1790 let result = match rdr.read_record(&mut rec).await {
1791 Err(err) => Some(Err(err)),
1792 Ok(true) => Some(rec.deserialize(headers.as_ref())),
1793 Ok(false) => None,
1794 };
1795
1796 (result, pos, rdr, headers, rec)
1797}
1798
1799/// A owned stream of pairs: deserialized records and position in stream before reading record.
1800///
1801/// The lifetime parameter `'r` refers to the lifetime of the underlying CSV `Reader`.
1802/// type, and `D` refers to the type that this stream will deserialize a record into.
1803#[allow(clippy::type_complexity)]
1804pub struct DeserializeRecordsIntoStreamPos<'r, R, D>
1805where
1806 R: io::AsyncRead + Unpin + Send
1807{
1808 header_fut: Option<
1809 Pin<
1810 Box<
1811 dyn Future<
1812 Output = (
1813 Result<StringRecord>,
1814 AsyncReaderImpl<R>,
1815 )
1816 > + Send + 'r,
1817 >,
1818 >,
1819 >,
1820 rec_fut: Option<
1821 Pin<
1822 Box<
1823 dyn Future<
1824 Output = (
1825 Option<Result<D>>,
1826 Position,
1827 AsyncReaderImpl<R>,
1828 Option<StringRecord>,
1829 StringRecord,
1830 )
1831 > + Send + 'r,
1832 >,
1833 >,
1834 >,
1835}
1836
1837impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsIntoStreamPos<'r, R, D>
1838where
1839 R: io::AsyncRead + Unpin + Send + 'r
1840{
1841 fn new(mut rdr: AsyncReaderImpl<R>) -> Self {
1842 let has_headers = rdr.has_headers();
1843 if has_headers {
1844 Self {
1845 header_fut: Some(Pin::from(Box::new(
1846 async{ (rdr.headers().await.cloned(), rdr) }
1847 ))),
1848 rec_fut: None,
1849 }
1850 } else {
1851 Self {
1852 header_fut: None,
1853 rec_fut: Some(Pin::from(Box::new(
1854 deserialize_record_with_pos(rdr, None, StringRecord::new())
1855 ))),
1856 }
1857 }
1858 }
1859}
1860
1861impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsIntoStreamPos<'r, R, D>
1862where
1863 R: io::AsyncRead + Unpin + Send + 'r
1864{
1865 type Item = (Result<D>, Position);
1866
1867 fn poll_next(
1868 mut self: Pin<&mut Self>,
1869 cx: &mut Context,
1870 ) -> Poll<Option<Self::Item>> {
1871 if let Some(header_fut) = &mut self.header_fut {
1872 match header_fut.as_mut().poll(cx) {
1873 Poll::Ready((Ok(headers), rdr)) => {
1874 self.header_fut = None;
1875 self.rec_fut = Some(Pin::from(Box::new(
1876 deserialize_record_with_pos(rdr, Some(headers), StringRecord::new()),
1877 )));
1878 cx.waker().wake_by_ref();
1879 Poll::Pending
1880 },
1881 Poll::Ready((Err(err), rdr)) => {
1882 self.header_fut = None;
1883 let pos = rdr.position().clone();
1884 self.rec_fut = Some(Pin::from(Box::new(
1885 deserialize_record_with_pos(rdr, None, StringRecord::new()),
1886 )));
1887 Poll::Ready(Some((Err(err), pos)))
1888 },
1889 Poll::Pending => Poll::Pending,
1890 }
1891 } else if let Some(fut) = self.rec_fut.as_mut() {
1892 match fut.as_mut().poll(cx) {
1893 Poll::Ready((result, pos, rdr, headers, rec)) => {
1894 if let Some(result) = result {
1895 self.rec_fut = Some(Pin::from(Box::new(
1896 deserialize_record_with_pos(rdr, headers, rec),
1897 )));
1898 Poll::Ready(Some((result, pos)))
1899 } else {
1900 self.rec_fut = None;
1901 Poll::Ready(None)
1902 }
1903 }
1904 Poll::Pending => Poll::Pending,
1905 }
1906 } else {
1907 Poll::Ready(None)
1908 }
1909 }
1910}
1911
1912}} // fi #[cfg(feature = "with_serde")]
1913