1use std::borrow::ToOwned;
13use std::sync::LazyLock;
14
15use anyhow::{anyhow, bail};
16use mz_repr::ColumnName;
17use regex::Regex;
18
19use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
20
21static QUERY_OUTPUT_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\r?\n----").unwrap());
22static DOUBLE_LINE_REGEX: LazyLock<Regex> =
23 LazyLock::new(|| Regex::new(r"(\n|\r\n|$)(\n|\r\n|$)").unwrap());
24static EOF_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"(\n|\r\n)EOF(\n|\r\n)").unwrap());
25
26#[derive(Debug, Clone)]
27pub struct Parser<'a> {
28 contents: &'a str,
29 fname: String,
30 curline: usize,
31 mode: Mode,
32}
33
34impl<'a> Parser<'a> {
35 pub fn new(fname: &str, contents: &'a str) -> Self {
36 Parser {
37 contents,
38 fname: fname.to_string(),
39 curline: 1,
40 mode: Mode::Standard,
41 }
42 }
43
44 pub fn is_done(&self) -> bool {
45 self.contents.is_empty()
46 }
47
48 pub fn location(&self) -> Location {
49 Location {
50 file: self.fname.clone(),
51 line: self.curline,
52 }
53 }
54
55 fn consume(&mut self, upto: usize) {
56 for ch in self.contents[..upto].chars() {
57 if ch == '\n' {
58 self.curline += 1;
59 }
60 }
61 self.contents = &self.contents[upto..];
62 }
63
64 pub fn split_at(&mut self, sep: &Regex) -> Result<&'a str, anyhow::Error> {
65 match sep.find(self.contents) {
66 Some(found) => {
67 let result = &self.contents[..found.start()];
68 self.consume(found.end());
69 Ok(result)
70 }
71 None => bail!("Couldn't split {:?} at {:?}", self.contents, sep),
72 }
73 }
74
75 pub fn parse_record(&mut self) -> Result<Record<'a>, anyhow::Error> {
76 if self.is_done() {
77 return Ok(Record::Halt);
78 }
79
80 let line_number = self.curline;
81
82 static COMMENT_AND_LINE_REGEX: LazyLock<Regex> =
83 LazyLock::new(|| Regex::new("(#[^\n]*)?\r?(\n|$)").unwrap());
84 let first_line = self.split_at(&COMMENT_AND_LINE_REGEX)?.trim();
85
86 if first_line.is_empty() {
87 return self.parse_record();
89 }
90
91 let mut words = first_line.split(' ').peekable();
92 match words.next().unwrap() {
93 "statement" => self.parse_statement(words, first_line),
94
95 "query" => self.parse_query(words, first_line),
96
97 "simple" => self.parse_simple(words),
98
99 "hash-threshold" => {
100 let threshold = words
101 .next()
102 .ok_or_else(|| anyhow!("missing threshold in: {}", first_line))?
103 .parse::<u64>()
104 .map_err(|err| anyhow!("invalid threshold ({}) in: {}", err, first_line))?;
105 Ok(Record::HashThreshold { threshold })
106 }
107
108 "skipif" => {
110 match words.next().unwrap() {
111 "postgresql" => {
112 self.parse_record()?;
114 self.parse_record()
115 }
116 _ => self.parse_record(),
117 }
118 }
119 "onlyif" => {
120 match words.next().unwrap() {
121 "postgresql" => self.parse_record(),
122 _ => {
123 self.parse_record()?;
125 self.parse_record()
126 }
127 }
128 }
129
130 "halt" => Ok(Record::Halt),
131
132 "subtest" | "user" | "kv-batch-size" => self.parse_record(),
134
135 "mode" => {
136 self.mode = match words.next() {
137 Some("cockroach") => Mode::Cockroach,
138 Some("standard") | Some("sqlite") => Mode::Standard,
139 other => bail!("unknown parse mode: {:?}", other),
140 };
141 self.parse_record()
142 }
143
144 "copy" => Ok(Record::Copy {
145 table_name: words
146 .next()
147 .ok_or_else(|| anyhow!("load directive missing table name"))?,
148 tsv_path: words
149 .next()
150 .ok_or_else(|| anyhow!("load directive missing TSV path"))?,
151 }),
152
153 "reset-server" => Ok(Record::ResetServer),
154
155 other => bail!(
156 "Unexpected start of record on line {}: {}",
157 line_number,
158 other
159 ),
160 }
161 }
162
163 pub fn parse_records(&mut self) -> Result<Vec<Record<'a>>, anyhow::Error> {
164 let mut records = vec![];
165 loop {
166 match self.parse_record()? {
167 Record::Halt => break,
168 record => records.push(record),
169 }
170 }
171 Ok(records)
172 }
173
174 fn parse_statement(
175 &mut self,
176 mut words: impl Iterator<Item = &'a str>,
177 first_line: &'a str,
178 ) -> Result<Record<'a>, anyhow::Error> {
179 let location = self.location();
180 let mut expected_error = None;
181 let mut rows_affected = None;
182 match words.next() {
183 Some("count") => {
184 rows_affected = Some(
185 words
186 .next()
187 .ok_or_else(|| anyhow!("missing count of rows affected"))?
188 .parse::<u64>()
189 .map_err(|err| anyhow!("parsing count of rows affected: {}", err))?,
190 );
191 }
192 Some("ok") | Some("OK") => (),
193 Some("error") => expected_error = Some(parse_expected_error(first_line)),
194 _ => bail!("invalid statement disposition: {}", first_line),
195 };
196 let sql = self.split_at(&DOUBLE_LINE_REGEX)?;
197 Ok(Record::Statement {
198 expected_error,
199 rows_affected,
200 sql,
201 location,
202 })
203 }
204
205 fn parse_query(
206 &mut self,
207 mut words: std::iter::Peekable<impl Iterator<Item = &'a str>>,
208 first_line: &'a str,
209 ) -> Result<Record<'a>, anyhow::Error> {
210 let location = self.location();
211 if words.peek() == Some(&"error") {
212 let error = parse_expected_error(first_line);
213 let sql = self.split_at(&DOUBLE_LINE_REGEX)?;
214 return Ok(Record::Query {
215 sql,
216 output: Err(error),
217 location,
218 });
219 }
220
221 let types = words.next().map_or(Ok(vec![]), parse_types)?;
222 let mut sort = Sort::No;
223 let mut check_column_names = false;
224 let mut multiline = false;
225 if let Some(options) = words.next() {
226 for option in options.split(',') {
227 match option {
228 "nosort" => sort = Sort::No,
229 "rowsort" => sort = Sort::Row,
230 "valuesort" => sort = Sort::Value,
231 "colnames" => check_column_names = true,
232 "multiline" => multiline = true,
233 other => {
234 if other.starts_with("partialsort") {
235 sort = Sort::Row;
239 break;
240 } else {
241 bail!("Unrecognized option {:?} in {:?}", other, options);
242 }
243 }
244 };
245 }
246 }
247 if multiline && (check_column_names || sort.yes()) {
248 bail!("multiline option is incompatible with all other options");
249 }
250 let label = words.next();
251 static LINE_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new("\r?(\n|$)").unwrap());
252 static HASH_REGEX: LazyLock<Regex> =
253 LazyLock::new(|| Regex::new(r"(\S+) values hashing to (\S+)").unwrap());
254 let sql = self.split_at(&QUERY_OUTPUT_REGEX)?;
255 let mut output_str = self.split_at(if multiline {
256 &EOF_REGEX
257 } else {
258 &DOUBLE_LINE_REGEX
259 })?;
260
261 output_str = if let Some(output_str_stripped) = regexp_strip_prefix(output_str, &LINE_REGEX)
265 {
266 output_str_stripped
267 } else {
268 assert!(output_str.is_empty());
273 output_str
274 };
275
276 let query_output_str = output_str;
280 let column_names = if check_column_names {
281 Some(
282 split_at(&mut output_str, &LINE_REGEX)?
283 .split(' ')
284 .filter(|s| !s.is_empty())
285 .map(|s| ColumnName::from(s.replace('␠', " ")))
286 .collect(),
287 )
288 } else {
289 None
290 };
291 let output = match HASH_REGEX.captures(output_str) {
292 Some(captures) => Output::Hashed {
293 num_values: captures.get(1).unwrap().as_str().parse::<usize>()?,
294 md5: captures.get(2).unwrap().as_str().to_owned(),
295 },
296 None => {
297 if multiline {
298 Output::Values(vec![output_str.to_owned()])
299 } else if output_str.starts_with('\r') || output_str.starts_with('\n') {
300 Output::Values(vec![])
301 } else {
302 let mut vals: Vec<String> = output_str.lines().map(|s| s.to_owned()).collect();
303 match self.mode {
304 Mode::Standard => {
305 if !multiline {
306 vals = vals.into_iter().map(|val| val.replace('⏎', "\n")).collect();
307 }
308 }
309 Mode::Cockroach => {
310 let mut rows: Vec<Vec<String>> = vec![];
311 for line in vals {
312 let cols = split_cols(&line, types.len());
313 if sort != Sort::No && cols.len() != types.len() {
314 bail!(
320 "col len ({}) did not match declared col len ({})",
321 cols.len(),
322 types.len()
323 );
324 }
325 rows.push(
326 cols.into_iter()
327 .map(|col| {
328 let mut col = col.replace('␠', " ");
329 if !multiline {
330 col = col.replace('⏎', "\n");
331 }
332 col
333 })
334 .collect(),
335 );
336 }
337 if sort == Sort::Row {
338 rows.sort();
339 }
340 vals = rows.into_iter().flatten().collect();
341 if sort == Sort::Value {
342 vals.sort();
343 }
344 }
345 }
346 Output::Values(vals)
347 }
348 }
349 };
350 Ok(Record::Query {
351 sql,
352 output: Ok(QueryOutput {
353 types,
354 sort,
355 multiline,
356 label,
357 column_names,
358 mode: self.mode,
359 output,
360 output_str: query_output_str,
361 }),
362 location,
363 })
364 }
365
366 fn parse_simple(
367 &mut self,
368 mut words: std::iter::Peekable<impl Iterator<Item = &'a str>>,
369 ) -> Result<Record<'a>, anyhow::Error> {
370 let location = self.location();
371 let mut conn = None;
372 let mut user = None;
373 let mut multiline = false;
374 if let Some(options) = words.next() {
375 for option in options.split(',') {
376 if let Some(value) = option.strip_prefix("conn=") {
377 conn = Some(value);
378 } else if let Some(value) = option.strip_prefix("user=") {
379 user = Some(value);
380 } else if option == "multiline" {
381 multiline = true;
382 } else {
383 bail!("Unrecognized option {:?} in {:?}", option, options);
384 }
385 }
386 }
387 if user.is_some() && conn.is_none() {
388 bail!("cannot set user without also setting conn");
389 }
390 let sql = self.split_at(&QUERY_OUTPUT_REGEX)?;
391 let output_str = self
392 .split_at(if multiline {
393 &EOF_REGEX
394 } else {
395 &DOUBLE_LINE_REGEX
396 })?
397 .trim_start();
398 let output = if multiline {
399 Output::Values({
400 let mut v = vec![output_str.to_owned()];
401 let complete_str = self.split_at(&DOUBLE_LINE_REGEX)?.trim_start();
403 v.extend(complete_str.lines().map(String::from));
404 v
405 })
406 } else {
407 Output::Values(output_str.lines().map(String::from).collect())
408 };
409 Ok(Record::Simple {
410 location,
411 conn,
412 user,
413 sql,
414 output,
415 output_str,
416 })
417 }
418}
419
420fn split_at<'a>(input: &mut &'a str, sep: &Regex) -> Result<&'a str, anyhow::Error> {
421 match sep.find(input) {
422 Some(found) => {
423 let result = &input[..found.start()];
424 *input = &input[found.end()..];
425 Ok(result)
426 }
427 None => bail!("Couldn't split {:?} at {:?}", input, sep),
428 }
429}
430
431fn parse_types(input: &str) -> Result<Vec<Type>, anyhow::Error> {
433 input
434 .chars()
435 .map(|char| {
436 Ok(match char {
437 'T' => Type::Text,
438 'I' => Type::Integer,
439 'R' => Type::Real,
440 'B' => Type::Bool,
441 'O' => Type::Oid,
442 _ => bail!("Unexpected type char {} in: {}", char, input),
443 })
444 })
445 .collect()
446}
447
448fn parse_expected_error(line: &str) -> &str {
449 static PGCODE_RE: LazyLock<Regex> =
450 LazyLock::new(|| Regex::new("(statement|query) error( pgcode [a-zA-Z0-9]{5})? ?").unwrap());
451 let pos = PGCODE_RE.find(line).unwrap().end();
454 &line[pos..]
455}
456
457pub(crate) fn split_cols(line: &str, expected_columns: usize) -> Vec<&str> {
463 if expected_columns == 1 {
464 vec![line.trim()]
465 } else {
466 line.split_whitespace().collect()
467 }
468}
469
470pub fn regexp_strip_prefix<'a>(text: &'a str, regexp: &Regex) -> Option<&'a str> {
471 match regexp.find(text) {
472 Some(found) => {
473 if found.start() == 0 {
474 Some(&text[found.end()..])
475 } else {
476 None
477 }
478 }
479 None => None,
480 }
481}