Struct csv_async::AsyncReader
source · pub struct AsyncReader<R>(/* private fields */);
Expand description
A already configured CSV reader for tokio
runtime.
A CSV reader takes as input CSV data and transforms that into standard Rust values. The reader reads CSV data is as a sequence of records, where a record is a sequence of fields and each field is a string.
Configuration
A CSV reader has convenient constructor method create_reader
.
However, if you want to configure the CSV reader to use
a different delimiter or quote character (among many other things), then
you should use a AsyncReaderBuilder
to construct
a AsyncReader
. For example, to change the field delimiter:
use std::error::Error;
use csv_async::AsyncReaderBuilder;
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city;country;pop
Boston;United States;4628910
";
let mut rdr = AsyncReaderBuilder::new()
.delimiter(b';')
.create_reader(data.as_bytes());
let mut records = rdr.records();
assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
Ok(())
}
Error handling
In general, CSV parsing does not ever return an error. That is, there is no such thing as malformed CSV data. Instead, this reader will prioritize finding a parse over rejecting CSV data that it does not understand. This choice was inspired by other popular CSV parsers, but also because it is pragmatic. CSV data varies wildly, so even if the CSV data is malformed, it might still be possible to work with the data. In the land of CSV, there is no “right” or “wrong,” only “right” and “less right.”
With that said, a number of errors can occur while reading CSV data:
- By default, all records in CSV data must have the same number of fields.
If a record is found with a different number of fields than a prior
record, then an error is returned. This behavior can be disabled by
enabling flexible parsing via the
flexible
method onAsyncReaderBuilder
. - When reading CSV data from a resource (like a file), it is possible for
reading from the underlying resource to fail. This will return an error.
For subsequent calls to the reader after encountering a such error
(unless
seek
is used), it will behave as if end of file had been reached, in order to avoid running into infinite loops when still attempting to read the next record when one has errored. - When reading CSV data into
String
or&str
fields (e.g., via aStringRecord
), UTF-8 is strictly enforced. If CSV data is invalid UTF-8, then an error is returned. If you want to read invalid UTF-8, then you should use the byte oriented APIs such asByteRecord
. If you need explicit support for another encoding entirely, then you’ll need to use another crate to transcode your CSV data to UTF-8 before parsing it. - When using Serde to deserialize CSV data into Rust types, it is possible
for a number of additional errors to occur. For example, deserializing
a field
xyz
into ani32
field will result in an error.
For more details on the precise semantics of errors, see the
Error
type.
Implementations§
source§impl<'r, R> AsyncReader<R>where
R: AsyncRead + Unpin + Send + 'r,
impl<'r, R> AsyncReader<R>where R: AsyncRead + Unpin + Send + 'r,
sourcepub fn from_reader(rdr: R) -> AsyncReader<R>
pub fn from_reader(rdr: R) -> AsyncReader<R>
Create a new CSV parser with a default configuration for the given reader.
To customize CSV parsing, use a ReaderBuilder
.
Example
use std::error::Error;
use csv_async::AsyncReader;
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
Concord,United States,42695
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
let mut records = rdr.into_records();
while let Some(record) = records.next().await {
println!("{:?}", record?);
}
Ok(())
}
sourcepub fn records(&mut self) -> StringRecordsStream<'_, R>
pub fn records(&mut self) -> StringRecordsStream<'_, R>
Returns a borrowed iterator over all records as strings.
Each item yielded by this iterator is a Result<StringRecord, Error>
.
Therefore, in order to access the record, callers must handle the
possibility of error (typically with try!
or ?
).
If has_headers
was enabled via a ReaderBuilder
(which is the
default), then this does not include the first record.
Example
use std::error::Error;
use csv_async::AsyncReader;
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
let mut records = rdr.records();
while let Some(record) = records.next().await {
println!("{:?}", record?);
}
Ok(())
}
sourcepub fn into_records(self) -> StringRecordsIntoStream<'r, R>
pub fn into_records(self) -> StringRecordsIntoStream<'r, R>
Returns an owned iterator over all records as strings.
Each item yielded by this iterator is a Result<StringRecord, Error>
.
Therefore, in order to access the record, callers must handle the
possibility of error (typically with try!
or ?
).
This is mostly useful when you want to return a CSV iterator or store it somewhere.
If has_headers
was enabled via a ReaderBuilder
(which is the
default), then this does not include the first record.
Example
use std::error::Error;
use csv_async::AsyncReader;
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let rdr = AsyncReader::from_reader(data.as_bytes());
let mut records = rdr.into_records();
while let Some(record) = records.next().await {
println!("{:?}", record?);
}
Ok(())
}
sourcepub fn byte_records(&mut self) -> ByteRecordsStream<'_, R>
pub fn byte_records(&mut self) -> ByteRecordsStream<'_, R>
Returns a borrowed iterator over all records as raw bytes.
Each item yielded by this iterator is a Result<ByteRecord, Error>
.
Therefore, in order to access the record, callers must handle the
possibility of error (typically with try!
or ?
).
If has_headers
was enabled via a ReaderBuilder
(which is the
default), then this does not include the first record.
Example
use std::error::Error;
use csv_async::AsyncReader;
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
let mut iter = rdr.byte_records();
assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
assert!(iter.next().await.is_none());
Ok(())
}
sourcepub fn into_byte_records(self) -> ByteRecordsIntoStream<'r, R>
pub fn into_byte_records(self) -> ByteRecordsIntoStream<'r, R>
Returns an owned iterator over all records as raw bytes.
Each item yielded by this iterator is a Result<ByteRecord, Error>
.
Therefore, in order to access the record, callers must handle the
possibility of error (typically with try!
or ?
).
This is mostly useful when you want to return a CSV iterator or store it somewhere.
If has_headers
was enabled via a ReaderBuilder
(which is the
default), then this does not include the first record.
Example
use std::error::Error;
use csv_async::AsyncReader;
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let rdr = AsyncReader::from_reader(data.as_bytes());
let mut iter = rdr.into_byte_records();
assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
assert!(iter.next().await.is_none());
Ok(())
}
sourcepub async fn headers(&mut self) -> Result<&StringRecord>
pub async fn headers(&mut self) -> Result<&StringRecord>
Returns a reference to the first row read by this parser.
If no row has been read yet, then this will force parsing of the first row.
If there was a problem parsing the row or if it wasn’t valid UTF-8, then this returns an error.
If the underlying reader emits EOF before any data, then this returns an empty record.
Note that this method may be used regardless of whether has_headers
was enabled (but it is enabled by default).
Example
This example shows how to get the header row of CSV data. Notice that the header row does not appear as a record in the iterator!
use std::error::Error;
use csv_async::AsyncReader;
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
// We can read the headers before iterating.
{
// `headers` borrows from the reader, so we put this in its
// own scope. That way, the borrow ends before we try iterating
// below. Alternatively, we could clone the headers.
let headers = rdr.headers().await?;
assert_eq!(headers, vec!["city", "country", "pop"]);
}
{
let mut records = rdr.records();
assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
assert!(records.next().await.is_none());
}
// We can also read the headers after iterating.
let headers = rdr.headers().await?;
assert_eq!(headers, vec!["city", "country", "pop"]);
Ok(())
}
sourcepub async fn byte_headers(&mut self) -> Result<&ByteRecord>
pub async fn byte_headers(&mut self) -> Result<&ByteRecord>
Returns a reference to the first row read by this parser as raw bytes.
If no row has been read yet, then this will force parsing of the first row.
If there was a problem parsing the row then this returns an error.
If the underlying reader emits EOF before any data, then this returns an empty record.
Note that this method may be used regardless of whether has_headers
was enabled (but it is enabled by default).
Example
This example shows how to get the header row of CSV data. Notice that the header row does not appear as a record in the iterator!
use std::error::Error;
use csv_async::AsyncReader;
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
// We can read the headers before iterating.
{
// `headers` borrows from the reader, so we put this in its
// own scope. That way, the borrow ends before we try iterating
// below. Alternatively, we could clone the headers.
let headers = rdr.byte_headers().await?;
assert_eq!(headers, vec!["city", "country", "pop"]);
}
{
let mut records = rdr.byte_records();
assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
assert!(records.next().await.is_none());
}
// We can also read the headers after iterating.
let headers = rdr.byte_headers().await?;
assert_eq!(headers, vec!["city", "country", "pop"]);
Ok(())
}
sourcepub fn set_headers(&mut self, headers: StringRecord)
pub fn set_headers(&mut self, headers: StringRecord)
Set the headers of this CSV parser manually.
This overrides any other setting (including set_byte_headers
). Any
automatic detection of headers is disabled. This may be called at any
time.
Example
use std::error::Error;
use csv_async::{AsyncReader, StringRecord};
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
Ok(())
}
sourcepub fn set_byte_headers(&mut self, headers: ByteRecord)
pub fn set_byte_headers(&mut self, headers: ByteRecord)
Set the headers of this CSV parser manually as raw bytes.
This overrides any other setting (including set_headers
). Any
automatic detection of headers is disabled. This may be called at any
time.
Example
use std::error::Error;
use csv_async::{AsyncReader, ByteRecord};
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
Ok(())
}
sourcepub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool>
pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool>
Read a single row into the given record. Returns false when no more records could be read.
If has_headers
was enabled via a ReaderBuilder
(which is the
default), then this will treat initial row as headers and read the first data record.
This method is useful when you want to read records as fast as
as possible. It’s less ergonomic than an iterator, but it permits the
caller to reuse the StringRecord
allocation, which usually results
in higher throughput.
Records read via this method are guaranteed to have a position set on them, even if the reader is at EOF or if an error is returned.
Example
use std::error::Error;
use csv_async::{AsyncReader, StringRecord};
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
let mut record = StringRecord::new();
if rdr.read_record(&mut record).await? {
assert_eq!(record, vec!["Boston", "United States", "4628910"]);
Ok(())
} else {
Err(From::from("expected at least one record but got none"))
}
}
sourcepub async fn read_byte_record(
&mut self,
record: &mut ByteRecord
) -> Result<bool>
pub async fn read_byte_record( &mut self, record: &mut ByteRecord ) -> Result<bool>
Read a single row into the given byte record. Returns false when no more records could be read.
If has_headers
was enabled via a ReaderBuilder
(which is the
default), then this will treat initial row as headers and read the first data record.
This method is useful when you want to read records as fast as
as possible. It’s less ergonomic than an iterator, but it permits the
caller to reuse the ByteRecord
allocation, which usually results
in higher throughput.
Records read via this method are guaranteed to have a position set on them, even if the reader is at EOF or if an error is returned.
Example
use std::error::Error;
use csv_async::{ByteRecord, AsyncReader};
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,pop
Boston,United States,4628910
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
let mut record = ByteRecord::new();
if rdr.read_byte_record(&mut record).await? {
assert_eq!(record, vec!["Boston", "United States", "4628910"]);
Ok(())
} else {
Err(From::from("expected at least one record but got none"))
}
}
sourcepub fn position(&self) -> &Position
pub fn position(&self) -> &Position
Return the current position of this CSV reader.
The byte offset in the position returned can be used to seek
this
reader. In particular, seeking to a position returned here on the same
data will result in parsing the same subsequent record.
Example: reading the position
use std::error::Error;
use std::io;
use csv_async::{AsyncReader, Position};
use tokio_stream::StreamExt;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,popcount
Boston,United States,4628910
Concord,United States,42695
";
let rdr = AsyncReader::from_reader(io::Cursor::new(data));
let mut iter = rdr.into_records();
let mut pos = Position::new();
loop {
let next = iter.next().await;
if let Some(next) = next {
pos = next?.position().expect("Cursor should be at some valid position").clone();
} else {
break;
}
}
// `pos` should now be the position immediately before the last
// record.
assert_eq!(pos.byte(), 51);
assert_eq!(pos.line(), 3);
assert_eq!(pos.record(), 2);
Ok(())
}
sourcepub fn is_done(&self) -> bool
pub fn is_done(&self) -> bool
Returns true if and only if this reader has been exhausted.
When this returns true, no more records can be read from this reader.
Example
use std::error::Error;
use tokio::io;
use tokio_stream::StreamExt;
use csv_async::{AsyncReader, Position};
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,popcount
Boston,United States,4628910
Concord,United States,42695
";
let mut rdr = AsyncReader::from_reader(data.as_bytes());
assert!(!rdr.is_done());
{
let mut records = rdr.records();
while let Some(record) = records.next().await {
let _ = record?;
}
}
assert!(rdr.is_done());
Ok(())
}
sourcepub fn has_headers(&self) -> bool
pub fn has_headers(&self) -> bool
Returns true if and only if this reader has been configured to interpret the first record as a header record.
sourcepub fn into_inner(self) -> R
pub fn into_inner(self) -> R
Unwraps this CSV reader, returning the underlying reader.
Note that any leftover data inside this reader’s internal buffer is lost.
source§impl<R: AsyncRead + AsyncSeek + Unpin> AsyncReader<R>
impl<R: AsyncRead + AsyncSeek + Unpin> AsyncReader<R>
sourcepub async fn seek(&mut self, pos: Position) -> Result<()>
pub async fn seek(&mut self, pos: Position) -> Result<()>
Seeks the underlying reader to the position given.
This comes with a few caveats:
- Any internal buffer associated with this reader is cleared.
- If the given position does not correspond to a position immediately before the start of a record, then the behavior of this reader is unspecified.
- Any special logic that skips the first record in the CSV reader when reading or iterating over records is disabled.
If the given position has a byte offset equivalent to the current position, then no seeking is performed.
If the header row has not already been read, then this will attempt to read the header row before seeking. Therefore, it is possible that this returns an error associated with reading CSV data.
Note that seeking is performed based only on the byte offset in the given position. Namely, the record or line numbers in the position may be incorrect, but this will cause any future position generated by this CSV reader to be similarly incorrect.
Example: seek to parse a record twice
use std::error::Error;
use tokio::io;
use tokio_stream::StreamExt;
use csv_async::{AsyncReader, Position};
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,popcount
Boston,United States,4628910
Concord,United States,42695
";
let mut rdr = AsyncReader::from_reader(std::io::Cursor::new(data));
let mut pos = Position::new();
{
let mut records = rdr.records();
loop {
let next = records.next().await;
if let Some(next) = next {
pos = next?.position().expect("Cursor should be at some valid position").clone();
} else {
break;
}
}
}
{
// Now seek the reader back to `pos`. This will let us read the
// last record again.
rdr.seek(pos).await?;
let mut records = rdr.into_records();
if let Some(result) = records.next().await {
let record = result?;
assert_eq!(record, vec!["Concord", "United States", "42695"]);
Ok(())
} else {
Err(From::from("expected at least one record but got none"))
}
}
}
sourcepub async fn seek_raw(
&mut self,
seek_from: SeekFrom,
pos: Position
) -> Result<()>
pub async fn seek_raw( &mut self, seek_from: SeekFrom, pos: Position ) -> Result<()>
This is like seek
, but provides direct control over how the seeking
operation is performed via io::SeekFrom
.
The pos
position given should correspond the position indicated
by seek_from
, but there is no requirement. If the pos
position
given is incorrect, then the position information returned by this
reader will be similarly incorrect.
If the header row has not already been read, then this will attempt to read the header row before seeking. Therefore, it is possible that this returns an error associated with reading CSV data.
Unlike seek
, this will always cause an actual seek to be performed.
sourcepub async fn rewind(&mut self) -> Result<()>
pub async fn rewind(&mut self) -> Result<()>
Rewinds the underlying reader to first data record.
Function is aware of header presence.
After rewind
record iterators will return first data record (skipping header if present), while
after seek(0)
they will return header row (even if has_header
is set).
Example: Reads the same data multiply times
use std::error::Error;
use tokio::io;
use tokio_stream::StreamExt;
use csv_async::AsyncReader;
async fn example() -> Result<(), Box<dyn Error>> {
let data = "\
city,country,popcount
Boston,United States,4628910
Concord,United States,42695
";
let mut rdr = AsyncReader::from_reader(std::io::Cursor::new(data));
let mut output = Vec::new();
loop {
let mut records = rdr.records();
while let Some(rec) = records.next().await {
output.push(rec?);
}
if output.len() >= 6 {
break;
} else {
drop(records);
rdr.rewind().await?;
}
}
assert_eq!(output,
vec![
vec!["Boston", "United States", "4628910"],
vec!["Concord", "United States", "42695"],
vec!["Boston", "United States", "4628910"],
vec!["Concord", "United States", "42695"],
vec!["Boston", "United States", "4628910"],
vec!["Concord", "United States", "42695"],
]);
Ok(())
}