1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use mz_repr::{Datum, Row};
use mz_storage_types::errors::DecodeErrorKind;
use mz_storage_types::sources::encoding::CsvEncoding;
#[derive(Debug)]
pub struct CsvDecoderState {
next_row_is_header: bool,
header_names: Option<Vec<String>>,
n_cols: usize,
output: Vec<u8>,
output_cursor: usize,
ends: Vec<usize>,
ends_cursor: usize,
csv_reader: csv_core::Reader,
row_buf: Row,
events_error: usize,
events_success: usize,
}
impl CsvDecoderState {
fn total_events(&self) -> usize {
self.events_error + self.events_success
}
pub fn new(format: CsvEncoding) -> Self {
let CsvEncoding { columns, delimiter } = format;
let n_cols = columns.arity();
let header_names = columns.into_header_names();
Self {
next_row_is_header: header_names.is_some(),
header_names,
n_cols,
output: vec![0],
output_cursor: 0,
ends: vec![0],
ends_cursor: 1,
csv_reader: csv_core::ReaderBuilder::new().delimiter(delimiter).build(),
row_buf: Row::default(),
events_error: 0,
events_success: 0,
}
}
pub fn reset_for_new_object(&mut self) {
if self.header_names.is_some() {
self.next_row_is_header = true;
}
}
pub fn decode(&mut self, chunk: &mut &[u8]) -> Result<Option<Row>, DecodeErrorKind> {
loop {
let (result, n_input, n_output, n_ends) = self.csv_reader.read_record(
*chunk,
&mut self.output[self.output_cursor..],
&mut self.ends[self.ends_cursor..],
);
self.output_cursor += n_output;
*chunk = &(*chunk)[n_input..];
self.ends_cursor += n_ends;
match result {
// Error cases
csv_core::ReadRecordResult::InputEmpty => break Ok(None),
csv_core::ReadRecordResult::OutputFull => {
let length = self.output.len();
self.output.extend(std::iter::repeat(0).take(length));
}
csv_core::ReadRecordResult::OutputEndsFull => {
let length = self.ends.len();
self.ends.extend(std::iter::repeat(0).take(length));
}
// Success cases
csv_core::ReadRecordResult::Record | csv_core::ReadRecordResult::End => {
let result = {
let ends_valid = self.ends_cursor - 1;
if ends_valid == 0 {
break Ok(None);
}
if ends_valid != self.n_cols {
self.events_error += 1;
Err(DecodeErrorKind::Text(
format!(
"CSV error at record number {}: expected {} columns, got {}.",
self.total_events(),
self.n_cols,
ends_valid
)
.into(),
))
} else {
match std::str::from_utf8(&self.output[0..self.output_cursor]) {
Ok(output) => {
self.events_success += 1;
let mut row_packer = self.row_buf.packer();
row_packer.extend((0..self.n_cols).map(|i| {
Datum::String(&output[self.ends[i]..self.ends[i + 1]])
}));
self.output_cursor = 0;
self.ends_cursor = 1;
Ok(Some(self.row_buf.clone()))
}
Err(e) => {
self.events_error += 1;
Err(DecodeErrorKind::Text(
format!(
"CSV error at record number {}: invalid UTF-8 ({})",
self.total_events(),
e
)
.into(),
))
}
}
}
};
// skip header rows, do not send them into dataflow
if self.next_row_is_header {
self.next_row_is_header = false;
if let Ok(Some(row)) = &result {
let mismatched = row
.iter()
.zip(self.header_names.iter().flatten())
.enumerate()
.find(|(_, (actual, expected))| actual.unwrap_str() != &**expected);
if let Some((i, (actual, expected))) = mismatched {
break Err(DecodeErrorKind::Text(
format!(
"source file contains incorrect columns '{:?}', \
first mismatched column at index {} expected={} actual={}",
row,
i + 1,
expected,
actual
)
.into(),
));
}
}
if chunk.is_empty() {
break Ok(None);
} else if result.is_err() {
break result;
}
} else {
break result;
}
}
}
}
}
}