csv_async/
lib.rs

1#![deny(missing_docs)]
2
3// Few unsafe lines are in src/string_record.rs
4// #![deny(unsafe_code)]
5
6/*!
7The `csv-async` crate provides a fast and flexible CSV reader and writer, 
8which is intended to be run in asynchronous environment - i.e.
9inside functions with `async` attribute called by tasks run by executor.
10This library does not imply using any particular executor.
11Unit tests and documentation snippets use either `async-std` or `tokio` crates.
12Synchronous interface for reading and writing CSV files is not contained in this crate,
13please use `csv` crate for this. This crate attempts to mimic `csv` crate API, but there are some exceptions.
14E.g. configuration builders have `create_...` factory functions instead of `from_...` as in `csv` crate.
15
16# Brief overview
17
18The primary types in this crate are
19[`AsyncReader`](struct.AsyncReader.html)
20and
21[`AsyncWriter`](struct.AsyncWriter.html) 
22for reading and writing CSV data respectively.
23Or [`AsyncDeserializer`](struct.AsyncDeserializer.html)
24and
25[`AsyncSerializer`](struct.AsyncSerializer.html) 
26for reading and writing CSV data using interfaces generated by `serde_derive` macros.
27
28Correspondingly, to support CSV data with custom field or record delimiters
29(among many other things), you should use either a
30[`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html)
31or a
32[`AsyncWriterBuilder`](struct.AsyncWriterBuilder.html),
33depending on whether you're reading or writing CSV data.
34
35The standard CSV record types are
36[`StringRecord`](struct.StringRecord.html)
37and
38[`ByteRecord`](struct.ByteRecord.html).
39`StringRecord` should be used when you know your data to be valid UTF-8.
40For data that may be invalid UTF-8, `ByteRecord` is suitable.
41
42Finally, the set of errors is described by the
43[`Error`](struct.Error.html)
44type.
45
46The rest of the types in this crate mostly correspond to more detailed errors,
47position information, configuration knobs or iterator types.
48
49# Setup
50
51In root folder for your project run `cargo add csv-async` or `cargo add --features tokio csv-async` to add this crate to your projext.
52
53# Examples
54
55This example shows how to read and write CSV file in asynchronous context and get into some record details.
56
57Sample input file:
58```csv
59city,region,country,population
60Southborough,MA,United States,9686
61Northbridge,MA,United States,14061
62Marlborough,MA,United States,38334
63Springfield,MA,United States,152227
64Springfield,MO,United States,150443
65Springfield,NJ,United States,14976
66Concord,NH,United States,42605
67```
68
69```no_run
70use std::error::Error;
71use std::process;
72#[cfg(not(feature = "tokio"))]
73use futures::stream::StreamExt;
74#[cfg(not(feature = "tokio"))]
75use async_std::fs::File;
76#[cfg(feature = "tokio")]
77use tokio1 as tokio;
78#[cfg(feature = "tokio")]
79use tokio_stream::StreamExt;
80#[cfg(feature = "tokio")]
81use tokio::fs::File;
82
83async fn filter_by_region(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
84    // Function reads CSV file that has column named "region" at second position (index = 1).
85    // It writes to new file only rows with region equal to passed argument
86    // and removes region column.
87    let mut rdr = csv_async::AsyncReader::from_reader(
88        File::open(file_in).await?
89    );
90    let mut wri = csv_async::AsyncWriter::from_writer(
91        File::create(file_out).await?
92    );
93    wri.write_record(rdr
94        .headers()
95        .await?.into_iter()
96        .filter(|h| *h != "region")
97    ).await?;
98    let mut records = rdr.records();
99    while let Some(record) = records.next().await {
100        let record = record?;
101        match record.get(1) {
102            Some(reg) if reg == region => 
103                wri.write_record(record
104                    .iter()
105                    .enumerate()
106                    .filter(|(i, _)| *i != 1)
107                    .map(|(_, s)| s)
108                ).await?,
109            _ => {},
110        }
111    }
112    Ok(())
113}
114
115#[cfg(not(feature = "tokio"))]
116fn main() {
117    async_std::task::block_on(async {
118        if let Err(err) = filter_by_region(
119            "MA",
120            "/tmp/all_regions.csv",
121            "/tmp/MA_only.csv"
122        ).await {
123            eprintln!("error running filter_by_region: {}", err);
124            process::exit(1);
125        }
126    });
127}
128
129#[cfg(feature = "tokio")]
130fn main() {
131    tokio::runtime::Runtime::new().unwrap().block_on(async {
132        if let Err(err) = filter_by_region(
133            "MA",
134            "/tmp/all_regions.csv",
135            "/tmp/MA_only.csv"
136        ).await {
137            eprintln!("error running filter_by_region: {}", err);
138            process::exit(1);
139        }
140    });
141}
142```
143
144```no_run
145use std::error::Error;
146use std::process;
147#[cfg(feature = "with_serde")]
148use serde::{Deserialize, Serialize};
149#[cfg(not(feature = "tokio"))]
150use futures::stream::StreamExt;
151#[cfg(not(feature = "tokio"))]
152use async_std::fs::File;
153#[cfg(feature = "tokio")]
154use tokio1 as tokio;
155#[cfg(feature = "tokio")]
156use tokio_stream::StreamExt;
157#[cfg(feature = "tokio")]
158use tokio::fs::File;
159
160#[cfg(feature = "with_serde")]
161#[derive(Deserialize, Serialize)]
162struct Row {
163    city: String,
164    region: String,
165    country: String,
166    population: u64,
167}
168
169#[cfg(feature = "with_serde")]
170async fn filter_by_region_serde(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
171    // Function reads CSV file that has column named "region" at second position (index = 1).
172    // It writes to new file only rows with region equal to passed argument.
173    let mut rdr = csv_async::AsyncDeserializer::from_reader(
174        File::open(file_in).await?
175    );
176    let mut wri = csv_async::AsyncSerializer::from_writer(
177        File::create(file_out).await?
178    );
179    let mut records = rdr.deserialize::<Row>();
180    while let Some(record) = records.next().await {
181        let record = record?;
182        if record.region == region {
183            wri.serialize(&record).await?;
184        }
185    }
186    Ok(())
187}
188
189#[cfg(feature = "with_serde")]
190#[cfg(not(feature = "tokio"))]
191fn main() {
192    async_std::task::block_on(async {
193        if let Err(err) = filter_by_region_serde(
194            "MA",
195            "/tmp/all_regions.csv",
196            "/tmp/MA_only.csv"
197        ).await {
198            eprintln!("error running filter_by_region_serde: {}", err);
199            process::exit(1);
200        }
201    });
202}
203
204#[cfg(feature = "with_serde")]
205#[cfg(feature = "tokio")]
206fn main() {
207    tokio::runtime::Runtime::new().unwrap().block_on(async {
208        if let Err(err) = filter_by_region_serde(
209            "MA",
210            "/tmp/all_regions.csv",
211            "/tmp/MA_only.csv"
212        ).await {
213            eprintln!("error running filter_by_region_serde: {}", err);
214            process::exit(1);
215        }
216    });
217}
218
219#[cfg(not(feature = "with_serde"))]
220fn main() {}
221```
222*/
223
224#[cfg(feature = "tokio")]
225extern crate tokio1 as tokio;
226
227#[cfg(test)]
228mod tests {
229    use std::error::Error;
230    
231    cfg_if::cfg_if! {
232        if #[cfg(feature = "tokio")] {
233            use tokio_stream::StreamExt;
234            use tokio::fs::File;
235        } else {
236            use futures::stream::StreamExt;
237            use async_std::fs::File;
238        }
239    }
240    
241    async fn create_async(file:&str) -> Result<(), Box<dyn Error>> {
242        // Build the CSV reader and iterate over each record.
243        let mut wri = crate::AsyncWriter::from_writer(
244            File::create(file).await?
245        );
246        wri.write_record(&["city","region","country","population","avg_age"]).await?;
247        wri.write_record(&["Northbridge","MA","United States","14061","42.5"]).await?;
248        wri.write_record(&["Westborough","MA","United States","29313", "45.1"]).await?;
249        wri.write_record(&["Springfield","NJ","United States","14976", "35.0"]).await?;
250        wri.flush().await?;
251        Ok(())
252    }
253   
254    async fn copy_async(file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
255        let mut rdr = crate::AsyncReader::from_reader(
256            File::open(file_in).await?
257        );
258        let mut wri = crate::AsyncWriter::from_writer(
259            File::create(file_out).await?
260        );
261        wri.write_record(rdr.headers().await?.into_iter()).await?;
262        let mut records = rdr.records();
263        while let Some(record) = records.next().await {
264            wri.write_record(&record?).await?;
265        }
266        Ok(())
267    }
268   
269    #[test]
270    fn test_on_files() {
271        use std::io::Read;
272        use std::hash::Hasher;
273        std::fs::create_dir_all("examples/data").unwrap();
274        let file_in  = "examples/data/smallpop.csv";
275        let file_out = "examples/data/smallpop_out.csv";
276
277        #[cfg(not(feature = "tokio"))]
278        async_std::task::block_on(async {
279            if let Err(err) = create_async(file_in).await {
280                assert!(false, "error running create_async: {}", err);
281            }
282            if let Err(err) = copy_async(file_in, file_out).await {
283                assert!(false, "error running copy_async: {}", err);
284            }
285        });
286        #[cfg(feature = "tokio")]
287        tokio::runtime::Runtime::new().unwrap().block_on(async {
288            if let Err(err) = create_async(file_in).await {
289                assert!(false, "error running create_async: {}", err);
290            }
291            if let Err(err) = copy_async(file_in, file_out).await {
292                assert!(false, "error running copy_async: {}", err);
293            }
294        });
295        
296        let mut bytes_in  = vec![];
297        std::fs::File::open(file_in).unwrap().read_to_end(&mut bytes_in).unwrap();
298        let mut hasher_in = std::collections::hash_map::DefaultHasher::new();
299        hasher_in.write(&bytes_in);
300
301        let mut bytes_out = vec![];
302        std::fs::File::open(file_out).unwrap().read_to_end(&mut bytes_out).unwrap();
303        let mut hasher_out = std::collections::hash_map::DefaultHasher::new();
304        hasher_out.write(&bytes_out);
305
306        assert_eq!(hasher_in.finish(), hasher_out.finish(), "Copied file {} is different than source {}", file_out, file_in);
307        
308        std::fs::remove_file(file_in).unwrap();
309        std::fs::remove_file(file_out).unwrap();
310    }
311 
312    cfg_if::cfg_if! {
313        if #[cfg(feature = "with_serde")] {
314            use serde::{Deserialize, Serialize};
315            
316            #[derive(Deserialize, Serialize)]
317            struct Row {
318                city: String,
319                region: String,
320                country: String,
321                population: u64,
322                avg_age: f32,
323            }
324            
325            async fn copy_async_serde(file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
326                let mut rdr = crate::AsyncDeserializer::from_reader(
327                    File::open(file_in).await?
328                );
329                let mut wri = crate::AsyncSerializer::from_writer(
330                    File::create(file_out).await?
331                );
332                // Caution:
333                // let mut records = rdr.deserialize();
334                // does compile, but produce empty output (deserialize to "()" type)
335                let mut records = rdr.deserialize::<Row>();
336                while let Some(record) = records.next().await {
337                    wri.serialize(&record?).await?;
338                }
339                Ok(())
340            }
341
342            #[test]
343            fn test_on_files_serde() {
344                use std::io::Read;
345                use std::hash::Hasher;
346                std::fs::create_dir_all("examples/data").unwrap();
347                let file_in  = "examples/data/smallpop_serde.csv";
348                let file_out = "examples/data/smallpop_serde_out.csv";
349
350                #[cfg(not(feature = "tokio"))]
351                async_std::task::block_on(async {
352                    if let Err(err) = create_async(file_in).await {
353                        assert!(false, "error running create_async: {}", err);
354                    }
355                    if let Err(err) = copy_async_serde(file_in, file_out).await {
356                        assert!(false, "error running copy_async_serde: {}", err);
357                    }
358                });
359                #[cfg(feature = "tokio")]
360                tokio::runtime::Runtime::new().unwrap().block_on(async {
361                    if let Err(err) = create_async(file_in).await {
362                        assert!(false, "error running create_async: {}", err);
363                    }
364                    if let Err(err) = copy_async_serde(file_in, file_out).await {
365                        assert!(false, "error running copy_async_serde: {}", err);
366                    }
367                });
368                
369                let mut bytes_in  = vec![];
370                std::fs::File::open(file_in).unwrap().read_to_end(&mut bytes_in).unwrap();
371                let mut hasher_in = std::collections::hash_map::DefaultHasher::new();
372                hasher_in.write(&bytes_in);
373
374                let mut bytes_out = vec![];
375                std::fs::File::open(file_out).unwrap().read_to_end(&mut bytes_out).unwrap();
376                let mut hasher_out = std::collections::hash_map::DefaultHasher::new();
377                hasher_out.write(&bytes_out);
378
379                assert_eq!(hasher_in.finish(), hasher_out.finish(), "Copied file {} is different than source {}", file_out, file_in);
380                
381                std::fs::remove_file(file_in).unwrap();
382                std::fs::remove_file(file_out).unwrap();
383            }
384          
385            #[test]
386            #[cfg(not(tarpaulin))]
387            fn test_on_files_serde_send() {
388                use std::io::Read;
389                use std::hash::Hasher;
390                std::fs::create_dir_all("examples/data").unwrap();
391                let file_in  = "examples/data/smallpop_serde_send.csv";
392                let file_out = "examples/data/smallpop_serde_send_out.csv";
393
394                // Below code requires / check that deserializers are Send.
395                #[cfg(not(feature = "tokio"))]
396                {
397                    let jh = async_std::task::spawn(async move {
398                        if let Err(err) = create_async(file_in).await {
399                            assert!(false, "error running create_async: {}", err);
400                        }
401                        if let Err(err) = copy_async_serde(file_in, file_out).await {
402                            assert!(false, "error running copy_async_serde: {}", err);
403                        }
404                    });
405                    async_std::task::block_on(jh);
406                }
407                #[cfg(feature = "tokio")]
408                {
409                    let rt = tokio::runtime::Runtime::new().unwrap();
410                    let jh = rt.spawn(async move {
411                        if let Err(err) = create_async(file_in).await {
412                            assert!(false, "error running create_async: {}", err);
413                        }
414                        if let Err(err) = copy_async_serde(file_in, file_out).await {
415                            assert!(false, "error running copy_async_serde: {}", err);
416                        }
417                    });
418                    rt.block_on(jh).unwrap();
419                }
420
421                let mut bytes_in  = vec![];
422                std::fs::File::open(file_in).unwrap().read_to_end(&mut bytes_in).unwrap();
423                let mut hasher_in = std::collections::hash_map::DefaultHasher::new();
424                hasher_in.write(&bytes_in);
425
426                let mut bytes_out = vec![];
427                std::fs::File::open(file_out).unwrap().read_to_end(&mut bytes_out).unwrap();
428                let mut hasher_out = std::collections::hash_map::DefaultHasher::new();
429                hasher_out.write(&bytes_out);
430
431                assert_eq!(hasher_in.finish(), hasher_out.finish(), "Copied file {} is different than source {}", file_out, file_in);
432                
433                std::fs::remove_file(file_in).unwrap();
434                std::fs::remove_file(file_out).unwrap();
435            }
436        }
437    }
438}
439
440mod byte_record;
441mod debug;
442mod error;
443mod string_record;
444
445cfg_if::cfg_if! {
446if #[cfg(feature = "with_serde")] {
447    mod deserializer;
448    mod serializer;
449    pub use deserializer::{DeserializeError, DeserializeErrorKind};
450}}
451
452mod async_readers;
453mod async_writers;
454
455// pub mod cookbook;
456// pub mod tutorial;
457
458
459pub use crate::byte_record::{ByteRecord, ByteRecordIter, Position};
460pub use crate::error::{
461    Error, ErrorKind, FromUtf8Error, IntoInnerError, Result, Utf8Error,
462};
463pub use crate::string_record::{StringRecord, StringRecordIter};
464
465pub use crate::async_readers::AsyncReaderBuilder;
466pub use crate::async_writers::AsyncWriterBuilder;
467
468cfg_if::cfg_if! {
469if #[cfg(feature = "tokio")] {
470    pub use crate::async_readers::{
471        ardr_tokio::AsyncReader, 
472        ByteRecordsIntoStream, ByteRecordsStream, 
473        StringRecordsIntoStream, StringRecordsStream,
474    };
475    pub use crate::async_writers::awtr_tokio::AsyncWriter;
476} else {
477    pub use crate::async_readers::{
478        ardr_futures::AsyncReader, 
479        ByteRecordsIntoStream, ByteRecordsStream, 
480        StringRecordsIntoStream, StringRecordsStream,
481    };
482    pub use crate::async_writers::awtr_futures::AsyncWriter;
483}}
484    
485#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
486pub use crate::async_readers::{
487    ades_futures::AsyncDeserializer, 
488    DeserializeRecordsStream, DeserializeRecordsIntoStream,
489    DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
490};
491#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
492pub use crate::async_writers::aser_futures::AsyncSerializer;
493#[cfg(all(feature = "with_serde", feature = "tokio"))]
494pub use crate::async_readers::{
495    ades_tokio::AsyncDeserializer, 
496    DeserializeRecordsStream, DeserializeRecordsIntoStream,
497    DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
498};
499#[cfg(all(feature = "with_serde", feature = "tokio"))]
500pub use crate::async_writers::aser_tokio::AsyncSerializer;
501
502
503/// The quoting style to use when writing CSV data.
504#[derive(Clone, Copy, Debug)]
505#[non_exhaustive]
506pub enum QuoteStyle {
507    /// This puts quotes around every field. Always.
508    Always,
509    /// This puts quotes around fields only when necessary.
510    ///
511    /// They are necessary when fields contain a quote, delimiter or record
512    /// terminator. Quotes are also necessary when writing an empty record
513    /// (which is indistinguishable from a record with one empty field).
514    ///
515    /// This is the default.
516    Necessary,
517    /// This puts quotes around all fields that are non-numeric. Namely, when
518    /// writing a field that does not parse as a valid float or integer, then
519    /// quotes will be used even if they aren't strictly necessary.
520    NonNumeric,
521    /// This *never* writes quotes, even if it would produce invalid CSV data.
522    Never,
523}
524
525impl QuoteStyle {
526    #[allow(unreachable_patterns)]
527    fn to_core(self) -> csv_core::QuoteStyle {
528        match self {
529            QuoteStyle::Always => csv_core::QuoteStyle::Always,
530            QuoteStyle::Necessary => csv_core::QuoteStyle::Necessary,
531            QuoteStyle::NonNumeric => csv_core::QuoteStyle::NonNumeric,
532            QuoteStyle::Never => csv_core::QuoteStyle::Never
533        }
534    }
535}
536
537impl Default for QuoteStyle {
538    fn default() -> QuoteStyle {
539        QuoteStyle::Necessary
540    }
541}
542
543/// A record terminator.
544///
545/// Use this to specify the record terminator while parsing CSV. The default is
546/// CRLF, which treats `\r`, `\n` or `\r\n` as a single record terminator.
547#[derive(Clone, Copy, Debug)]
548#[non_exhaustive]
549pub enum Terminator {
550    /// Parses `\r`, `\n` or `\r\n` as a single record terminator.
551    CRLF,
552    /// Parses the byte given as a record terminator.
553    Any(u8),
554}
555
556impl Terminator {
557    /// Convert this to the csv_core type of the same name.
558   #[allow(unreachable_patterns)]
559   fn to_core(self) -> csv_core::Terminator {
560        match self {
561            Terminator::CRLF => csv_core::Terminator::CRLF,
562            Terminator::Any(b) => csv_core::Terminator::Any(b)
563        }
564    }
565}
566
567impl Default for Terminator {
568    fn default() -> Terminator {
569        Terminator::CRLF
570    }
571}
572
573/// The whitespace preservation behavior when reading CSV data.
574#[derive(Clone, Copy, Debug, PartialEq)]
575#[non_exhaustive]
576pub enum Trim {
577    /// Preserves fields and headers. This is the default.
578    None,
579    /// Trim whitespace from headers.
580    Headers,
581    /// Trim whitespace from fields, but not headers.
582    Fields,
583    /// Trim whitespace from fields and headers.
584    All,
585}
586
587impl Trim {
588    fn should_trim_fields(&self) -> bool {
589        self == &Trim::Fields || self == &Trim::All
590    }
591
592    fn should_trim_headers(&self) -> bool {
593        self == &Trim::Headers || self == &Trim::All
594    }
595}
596
597impl Default for Trim {
598    fn default() -> Trim {
599        Trim::None
600    }
601}
602