csv_async/lib.rs
1#![deny(missing_docs)]
2
3// Few unsafe lines are in src/string_record.rs
4// #![deny(unsafe_code)]
5
6/*!
7The `csv-async` crate provides a fast and flexible CSV reader and writer,
8which is intended to be run in asynchronous environment - i.e.
9inside functions with `async` attribute called by tasks run by executor.
10This library does not imply using any particular executor.
11Unit tests and documentation snippets use either `async-std` or `tokio` crates.
12Synchronous interface for reading and writing CSV files is not contained in this crate,
13please use `csv` crate for this. This crate attempts to mimic `csv` crate API, but there are some exceptions.
14E.g. configuration builders have `create_...` factory functions instead of `from_...` as in `csv` crate.
15
16# Brief overview
17
18The primary types in this crate are
19[`AsyncReader`](struct.AsyncReader.html)
20and
21[`AsyncWriter`](struct.AsyncWriter.html)
22for reading and writing CSV data respectively.
23Or [`AsyncDeserializer`](struct.AsyncDeserializer.html)
24and
25[`AsyncSerializer`](struct.AsyncSerializer.html)
26for reading and writing CSV data using interfaces generated by `serde_derive` macros.
27
28Correspondingly, to support CSV data with custom field or record delimiters
29(among many other things), you should use either a
30[`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html)
31or a
32[`AsyncWriterBuilder`](struct.AsyncWriterBuilder.html),
33depending on whether you're reading or writing CSV data.
34
35The standard CSV record types are
36[`StringRecord`](struct.StringRecord.html)
37and
38[`ByteRecord`](struct.ByteRecord.html).
39`StringRecord` should be used when you know your data to be valid UTF-8.
40For data that may be invalid UTF-8, `ByteRecord` is suitable.
41
42Finally, the set of errors is described by the
43[`Error`](struct.Error.html)
44type.
45
46The rest of the types in this crate mostly correspond to more detailed errors,
47position information, configuration knobs or iterator types.
48
49# Setup
50
51In root folder for your project run `cargo add csv-async` or `cargo add --features tokio csv-async` to add this crate to your projext.
52
53# Examples
54
55This example shows how to read and write CSV file in asynchronous context and get into some record details.
56
57Sample input file:
58```csv
59city,region,country,population
60Southborough,MA,United States,9686
61Northbridge,MA,United States,14061
62Marlborough,MA,United States,38334
63Springfield,MA,United States,152227
64Springfield,MO,United States,150443
65Springfield,NJ,United States,14976
66Concord,NH,United States,42605
67```
68
69```no_run
70use std::error::Error;
71use std::process;
72#[cfg(not(feature = "tokio"))]
73use futures::stream::StreamExt;
74#[cfg(not(feature = "tokio"))]
75use async_std::fs::File;
76#[cfg(feature = "tokio")]
77use tokio1 as tokio;
78#[cfg(feature = "tokio")]
79use tokio_stream::StreamExt;
80#[cfg(feature = "tokio")]
81use tokio::fs::File;
82
83async fn filter_by_region(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
84 // Function reads CSV file that has column named "region" at second position (index = 1).
85 // It writes to new file only rows with region equal to passed argument
86 // and removes region column.
87 let mut rdr = csv_async::AsyncReader::from_reader(
88 File::open(file_in).await?
89 );
90 let mut wri = csv_async::AsyncWriter::from_writer(
91 File::create(file_out).await?
92 );
93 wri.write_record(rdr
94 .headers()
95 .await?.into_iter()
96 .filter(|h| *h != "region")
97 ).await?;
98 let mut records = rdr.records();
99 while let Some(record) = records.next().await {
100 let record = record?;
101 match record.get(1) {
102 Some(reg) if reg == region =>
103 wri.write_record(record
104 .iter()
105 .enumerate()
106 .filter(|(i, _)| *i != 1)
107 .map(|(_, s)| s)
108 ).await?,
109 _ => {},
110 }
111 }
112 Ok(())
113}
114
115#[cfg(not(feature = "tokio"))]
116fn main() {
117 async_std::task::block_on(async {
118 if let Err(err) = filter_by_region(
119 "MA",
120 "/tmp/all_regions.csv",
121 "/tmp/MA_only.csv"
122 ).await {
123 eprintln!("error running filter_by_region: {}", err);
124 process::exit(1);
125 }
126 });
127}
128
129#[cfg(feature = "tokio")]
130fn main() {
131 tokio::runtime::Runtime::new().unwrap().block_on(async {
132 if let Err(err) = filter_by_region(
133 "MA",
134 "/tmp/all_regions.csv",
135 "/tmp/MA_only.csv"
136 ).await {
137 eprintln!("error running filter_by_region: {}", err);
138 process::exit(1);
139 }
140 });
141}
142```
143
144```no_run
145use std::error::Error;
146use std::process;
147#[cfg(feature = "with_serde")]
148use serde::{Deserialize, Serialize};
149#[cfg(not(feature = "tokio"))]
150use futures::stream::StreamExt;
151#[cfg(not(feature = "tokio"))]
152use async_std::fs::File;
153#[cfg(feature = "tokio")]
154use tokio1 as tokio;
155#[cfg(feature = "tokio")]
156use tokio_stream::StreamExt;
157#[cfg(feature = "tokio")]
158use tokio::fs::File;
159
160#[cfg(feature = "with_serde")]
161#[derive(Deserialize, Serialize)]
162struct Row {
163 city: String,
164 region: String,
165 country: String,
166 population: u64,
167}
168
169#[cfg(feature = "with_serde")]
170async fn filter_by_region_serde(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
171 // Function reads CSV file that has column named "region" at second position (index = 1).
172 // It writes to new file only rows with region equal to passed argument.
173 let mut rdr = csv_async::AsyncDeserializer::from_reader(
174 File::open(file_in).await?
175 );
176 let mut wri = csv_async::AsyncSerializer::from_writer(
177 File::create(file_out).await?
178 );
179 let mut records = rdr.deserialize::<Row>();
180 while let Some(record) = records.next().await {
181 let record = record?;
182 if record.region == region {
183 wri.serialize(&record).await?;
184 }
185 }
186 Ok(())
187}
188
189#[cfg(feature = "with_serde")]
190#[cfg(not(feature = "tokio"))]
191fn main() {
192 async_std::task::block_on(async {
193 if let Err(err) = filter_by_region_serde(
194 "MA",
195 "/tmp/all_regions.csv",
196 "/tmp/MA_only.csv"
197 ).await {
198 eprintln!("error running filter_by_region_serde: {}", err);
199 process::exit(1);
200 }
201 });
202}
203
204#[cfg(feature = "with_serde")]
205#[cfg(feature = "tokio")]
206fn main() {
207 tokio::runtime::Runtime::new().unwrap().block_on(async {
208 if let Err(err) = filter_by_region_serde(
209 "MA",
210 "/tmp/all_regions.csv",
211 "/tmp/MA_only.csv"
212 ).await {
213 eprintln!("error running filter_by_region_serde: {}", err);
214 process::exit(1);
215 }
216 });
217}
218
219#[cfg(not(feature = "with_serde"))]
220fn main() {}
221```
222*/
223
224#[cfg(feature = "tokio")]
225extern crate tokio1 as tokio;
226
227#[cfg(test)]
228mod tests {
229 use std::error::Error;
230
231 cfg_if::cfg_if! {
232 if #[cfg(feature = "tokio")] {
233 use tokio_stream::StreamExt;
234 use tokio::fs::File;
235 } else {
236 use futures::stream::StreamExt;
237 use async_std::fs::File;
238 }
239 }
240
241 async fn create_async(file:&str) -> Result<(), Box<dyn Error>> {
242 // Build the CSV reader and iterate over each record.
243 let mut wri = crate::AsyncWriter::from_writer(
244 File::create(file).await?
245 );
246 wri.write_record(&["city","region","country","population","avg_age"]).await?;
247 wri.write_record(&["Northbridge","MA","United States","14061","42.5"]).await?;
248 wri.write_record(&["Westborough","MA","United States","29313", "45.1"]).await?;
249 wri.write_record(&["Springfield","NJ","United States","14976", "35.0"]).await?;
250 wri.flush().await?;
251 Ok(())
252 }
253
254 async fn copy_async(file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
255 let mut rdr = crate::AsyncReader::from_reader(
256 File::open(file_in).await?
257 );
258 let mut wri = crate::AsyncWriter::from_writer(
259 File::create(file_out).await?
260 );
261 wri.write_record(rdr.headers().await?.into_iter()).await?;
262 let mut records = rdr.records();
263 while let Some(record) = records.next().await {
264 wri.write_record(&record?).await?;
265 }
266 Ok(())
267 }
268
269 #[test]
270 fn test_on_files() {
271 use std::io::Read;
272 use std::hash::Hasher;
273 std::fs::create_dir_all("examples/data").unwrap();
274 let file_in = "examples/data/smallpop.csv";
275 let file_out = "examples/data/smallpop_out.csv";
276
277 #[cfg(not(feature = "tokio"))]
278 async_std::task::block_on(async {
279 if let Err(err) = create_async(file_in).await {
280 assert!(false, "error running create_async: {}", err);
281 }
282 if let Err(err) = copy_async(file_in, file_out).await {
283 assert!(false, "error running copy_async: {}", err);
284 }
285 });
286 #[cfg(feature = "tokio")]
287 tokio::runtime::Runtime::new().unwrap().block_on(async {
288 if let Err(err) = create_async(file_in).await {
289 assert!(false, "error running create_async: {}", err);
290 }
291 if let Err(err) = copy_async(file_in, file_out).await {
292 assert!(false, "error running copy_async: {}", err);
293 }
294 });
295
296 let mut bytes_in = vec![];
297 std::fs::File::open(file_in).unwrap().read_to_end(&mut bytes_in).unwrap();
298 let mut hasher_in = std::collections::hash_map::DefaultHasher::new();
299 hasher_in.write(&bytes_in);
300
301 let mut bytes_out = vec![];
302 std::fs::File::open(file_out).unwrap().read_to_end(&mut bytes_out).unwrap();
303 let mut hasher_out = std::collections::hash_map::DefaultHasher::new();
304 hasher_out.write(&bytes_out);
305
306 assert_eq!(hasher_in.finish(), hasher_out.finish(), "Copied file {} is different than source {}", file_out, file_in);
307
308 std::fs::remove_file(file_in).unwrap();
309 std::fs::remove_file(file_out).unwrap();
310 }
311
312 cfg_if::cfg_if! {
313 if #[cfg(feature = "with_serde")] {
314 use serde::{Deserialize, Serialize};
315
316 #[derive(Deserialize, Serialize)]
317 struct Row {
318 city: String,
319 region: String,
320 country: String,
321 population: u64,
322 avg_age: f32,
323 }
324
325 async fn copy_async_serde(file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
326 let mut rdr = crate::AsyncDeserializer::from_reader(
327 File::open(file_in).await?
328 );
329 let mut wri = crate::AsyncSerializer::from_writer(
330 File::create(file_out).await?
331 );
332 // Caution:
333 // let mut records = rdr.deserialize();
334 // does compile, but produce empty output (deserialize to "()" type)
335 let mut records = rdr.deserialize::<Row>();
336 while let Some(record) = records.next().await {
337 wri.serialize(&record?).await?;
338 }
339 Ok(())
340 }
341
342 #[test]
343 fn test_on_files_serde() {
344 use std::io::Read;
345 use std::hash::Hasher;
346 std::fs::create_dir_all("examples/data").unwrap();
347 let file_in = "examples/data/smallpop_serde.csv";
348 let file_out = "examples/data/smallpop_serde_out.csv";
349
350 #[cfg(not(feature = "tokio"))]
351 async_std::task::block_on(async {
352 if let Err(err) = create_async(file_in).await {
353 assert!(false, "error running create_async: {}", err);
354 }
355 if let Err(err) = copy_async_serde(file_in, file_out).await {
356 assert!(false, "error running copy_async_serde: {}", err);
357 }
358 });
359 #[cfg(feature = "tokio")]
360 tokio::runtime::Runtime::new().unwrap().block_on(async {
361 if let Err(err) = create_async(file_in).await {
362 assert!(false, "error running create_async: {}", err);
363 }
364 if let Err(err) = copy_async_serde(file_in, file_out).await {
365 assert!(false, "error running copy_async_serde: {}", err);
366 }
367 });
368
369 let mut bytes_in = vec![];
370 std::fs::File::open(file_in).unwrap().read_to_end(&mut bytes_in).unwrap();
371 let mut hasher_in = std::collections::hash_map::DefaultHasher::new();
372 hasher_in.write(&bytes_in);
373
374 let mut bytes_out = vec![];
375 std::fs::File::open(file_out).unwrap().read_to_end(&mut bytes_out).unwrap();
376 let mut hasher_out = std::collections::hash_map::DefaultHasher::new();
377 hasher_out.write(&bytes_out);
378
379 assert_eq!(hasher_in.finish(), hasher_out.finish(), "Copied file {} is different than source {}", file_out, file_in);
380
381 std::fs::remove_file(file_in).unwrap();
382 std::fs::remove_file(file_out).unwrap();
383 }
384
385 #[test]
386 #[cfg(not(tarpaulin))]
387 fn test_on_files_serde_send() {
388 use std::io::Read;
389 use std::hash::Hasher;
390 std::fs::create_dir_all("examples/data").unwrap();
391 let file_in = "examples/data/smallpop_serde_send.csv";
392 let file_out = "examples/data/smallpop_serde_send_out.csv";
393
394 // Below code requires / check that deserializers are Send.
395 #[cfg(not(feature = "tokio"))]
396 {
397 let jh = async_std::task::spawn(async move {
398 if let Err(err) = create_async(file_in).await {
399 assert!(false, "error running create_async: {}", err);
400 }
401 if let Err(err) = copy_async_serde(file_in, file_out).await {
402 assert!(false, "error running copy_async_serde: {}", err);
403 }
404 });
405 async_std::task::block_on(jh);
406 }
407 #[cfg(feature = "tokio")]
408 {
409 let rt = tokio::runtime::Runtime::new().unwrap();
410 let jh = rt.spawn(async move {
411 if let Err(err) = create_async(file_in).await {
412 assert!(false, "error running create_async: {}", err);
413 }
414 if let Err(err) = copy_async_serde(file_in, file_out).await {
415 assert!(false, "error running copy_async_serde: {}", err);
416 }
417 });
418 rt.block_on(jh).unwrap();
419 }
420
421 let mut bytes_in = vec![];
422 std::fs::File::open(file_in).unwrap().read_to_end(&mut bytes_in).unwrap();
423 let mut hasher_in = std::collections::hash_map::DefaultHasher::new();
424 hasher_in.write(&bytes_in);
425
426 let mut bytes_out = vec![];
427 std::fs::File::open(file_out).unwrap().read_to_end(&mut bytes_out).unwrap();
428 let mut hasher_out = std::collections::hash_map::DefaultHasher::new();
429 hasher_out.write(&bytes_out);
430
431 assert_eq!(hasher_in.finish(), hasher_out.finish(), "Copied file {} is different than source {}", file_out, file_in);
432
433 std::fs::remove_file(file_in).unwrap();
434 std::fs::remove_file(file_out).unwrap();
435 }
436 }
437 }
438}
439
440mod byte_record;
441mod debug;
442mod error;
443mod string_record;
444
445cfg_if::cfg_if! {
446if #[cfg(feature = "with_serde")] {
447 mod deserializer;
448 mod serializer;
449 pub use deserializer::{DeserializeError, DeserializeErrorKind};
450}}
451
452mod async_readers;
453mod async_writers;
454
455// pub mod cookbook;
456// pub mod tutorial;
457
458
459pub use crate::byte_record::{ByteRecord, ByteRecordIter, Position};
460pub use crate::error::{
461 Error, ErrorKind, FromUtf8Error, IntoInnerError, Result, Utf8Error,
462};
463pub use crate::string_record::{StringRecord, StringRecordIter};
464
465pub use crate::async_readers::AsyncReaderBuilder;
466pub use crate::async_writers::AsyncWriterBuilder;
467
468cfg_if::cfg_if! {
469if #[cfg(feature = "tokio")] {
470 pub use crate::async_readers::{
471 ardr_tokio::AsyncReader,
472 ByteRecordsIntoStream, ByteRecordsStream,
473 StringRecordsIntoStream, StringRecordsStream,
474 };
475 pub use crate::async_writers::awtr_tokio::AsyncWriter;
476} else {
477 pub use crate::async_readers::{
478 ardr_futures::AsyncReader,
479 ByteRecordsIntoStream, ByteRecordsStream,
480 StringRecordsIntoStream, StringRecordsStream,
481 };
482 pub use crate::async_writers::awtr_futures::AsyncWriter;
483}}
484
485#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
486pub use crate::async_readers::{
487 ades_futures::AsyncDeserializer,
488 DeserializeRecordsStream, DeserializeRecordsIntoStream,
489 DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
490};
491#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
492pub use crate::async_writers::aser_futures::AsyncSerializer;
493#[cfg(all(feature = "with_serde", feature = "tokio"))]
494pub use crate::async_readers::{
495 ades_tokio::AsyncDeserializer,
496 DeserializeRecordsStream, DeserializeRecordsIntoStream,
497 DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
498};
499#[cfg(all(feature = "with_serde", feature = "tokio"))]
500pub use crate::async_writers::aser_tokio::AsyncSerializer;
501
502
503/// The quoting style to use when writing CSV data.
504#[derive(Clone, Copy, Debug)]
505#[non_exhaustive]
506pub enum QuoteStyle {
507 /// This puts quotes around every field. Always.
508 Always,
509 /// This puts quotes around fields only when necessary.
510 ///
511 /// They are necessary when fields contain a quote, delimiter or record
512 /// terminator. Quotes are also necessary when writing an empty record
513 /// (which is indistinguishable from a record with one empty field).
514 ///
515 /// This is the default.
516 Necessary,
517 /// This puts quotes around all fields that are non-numeric. Namely, when
518 /// writing a field that does not parse as a valid float or integer, then
519 /// quotes will be used even if they aren't strictly necessary.
520 NonNumeric,
521 /// This *never* writes quotes, even if it would produce invalid CSV data.
522 Never,
523}
524
525impl QuoteStyle {
526 #[allow(unreachable_patterns)]
527 fn to_core(self) -> csv_core::QuoteStyle {
528 match self {
529 QuoteStyle::Always => csv_core::QuoteStyle::Always,
530 QuoteStyle::Necessary => csv_core::QuoteStyle::Necessary,
531 QuoteStyle::NonNumeric => csv_core::QuoteStyle::NonNumeric,
532 QuoteStyle::Never => csv_core::QuoteStyle::Never
533 }
534 }
535}
536
537impl Default for QuoteStyle {
538 fn default() -> QuoteStyle {
539 QuoteStyle::Necessary
540 }
541}
542
543/// A record terminator.
544///
545/// Use this to specify the record terminator while parsing CSV. The default is
546/// CRLF, which treats `\r`, `\n` or `\r\n` as a single record terminator.
547#[derive(Clone, Copy, Debug)]
548#[non_exhaustive]
549pub enum Terminator {
550 /// Parses `\r`, `\n` or `\r\n` as a single record terminator.
551 CRLF,
552 /// Parses the byte given as a record terminator.
553 Any(u8),
554}
555
556impl Terminator {
557 /// Convert this to the csv_core type of the same name.
558 #[allow(unreachable_patterns)]
559 fn to_core(self) -> csv_core::Terminator {
560 match self {
561 Terminator::CRLF => csv_core::Terminator::CRLF,
562 Terminator::Any(b) => csv_core::Terminator::Any(b)
563 }
564 }
565}
566
567impl Default for Terminator {
568 fn default() -> Terminator {
569 Terminator::CRLF
570 }
571}
572
573/// The whitespace preservation behavior when reading CSV data.
574#[derive(Clone, Copy, Debug, PartialEq)]
575#[non_exhaustive]
576pub enum Trim {
577 /// Preserves fields and headers. This is the default.
578 None,
579 /// Trim whitespace from headers.
580 Headers,
581 /// Trim whitespace from fields, but not headers.
582 Fields,
583 /// Trim whitespace from fields and headers.
584 All,
585}
586
587impl Trim {
588 fn should_trim_fields(&self) -> bool {
589 self == &Trim::Fields || self == &Trim::All
590 }
591
592 fn should_trim_headers(&self) -> bool {
593 self == &Trim::Headers || self == &Trim::All
594 }
595}
596
597impl Default for Trim {
598 fn default() -> Trim {
599 Trim::None
600 }
601}
602