1use std::borrow::ToOwned;
13use std::sync::LazyLock;
14
15use anyhow::{anyhow, bail};
16use mz_repr::ColumnName;
17use regex::Regex;
18
19use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
20
21static QUERY_OUTPUT_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\r?\n----").unwrap());
22static DOUBLE_LINE_REGEX: LazyLock<Regex> =
23 LazyLock::new(|| Regex::new(r"(\n|\r\n|$)(\n|\r\n|$)").unwrap());
24static EOF_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"(\n|\r\n)EOF(\n|\r\n)").unwrap());
25static REPLACE_SEP_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r" {2,}").unwrap());
28
29#[derive(Debug, Clone)]
30pub struct Parser<'a> {
31 contents: &'a str,
32 fname: String,
33 curline: usize,
34 mode: Mode,
35}
36
37impl<'a> Parser<'a> {
38 pub fn new(fname: &str, contents: &'a str) -> Self {
39 Parser {
40 contents,
41 fname: fname.to_string(),
42 curline: 1,
43 mode: Mode::Standard,
44 }
45 }
46
47 pub fn is_done(&self) -> bool {
48 self.contents.is_empty()
49 }
50
51 pub fn location(&self) -> Location {
52 Location {
53 file: self.fname.clone(),
54 line: self.curline,
55 }
56 }
57
58 fn consume(&mut self, upto: usize) {
59 for ch in self.contents[..upto].chars() {
60 if ch == '\n' {
61 self.curline += 1;
62 }
63 }
64 self.contents = &self.contents[upto..];
65 }
66
67 pub fn split_at(&mut self, sep: &Regex) -> Result<&'a str, anyhow::Error> {
68 match sep.find(self.contents) {
69 Some(found) => {
70 let result = &self.contents[..found.start()];
71 self.consume(found.end());
72 Ok(result)
73 }
74 None => bail!("Couldn't split {:?} at {:?}", self.contents, sep),
75 }
76 }
77
78 pub fn parse_record(&mut self) -> Result<Record<'a>, anyhow::Error> {
79 if self.is_done() {
80 return Ok(Record::Halt);
81 }
82
83 let line_number = self.curline;
84
85 static COMMENT_AND_LINE_REGEX: LazyLock<Regex> =
86 LazyLock::new(|| Regex::new("(#[^\n]*)?\r?(\n|$)").unwrap());
87 let first_line = self.split_at(&COMMENT_AND_LINE_REGEX)?.trim();
88
89 if first_line.is_empty() {
90 return self.parse_record();
92 }
93
94 let mut words = first_line.split(' ').peekable();
95 match words.next().unwrap() {
96 "statement" => self.parse_statement(words, first_line),
97
98 "query" => self.parse_query(words, first_line),
99
100 "simple" => self.parse_simple(words),
101
102 "hash-threshold" => {
103 let threshold = words
104 .next()
105 .ok_or_else(|| anyhow!("missing threshold in: {}", first_line))?
106 .parse::<u64>()
107 .map_err(|err| anyhow!("invalid threshold ({}) in: {}", err, first_line))?;
108 Ok(Record::HashThreshold { threshold })
109 }
110
111 "skipif" => {
113 match words.next().unwrap() {
114 "postgresql" => {
115 self.parse_record()?;
117 self.parse_record()
118 }
119 _ => self.parse_record(),
120 }
121 }
122 "onlyif" => {
123 match words.next().unwrap() {
124 "postgresql" => self.parse_record(),
125 _ => {
126 self.parse_record()?;
128 self.parse_record()
129 }
130 }
131 }
132
133 "halt" => Ok(Record::Halt),
134
135 "subtest" | "user" | "kv-batch-size" => self.parse_record(),
137
138 "mode" => {
139 self.mode = match words.next() {
140 Some("cockroach") => Mode::Cockroach,
141 Some("standard") | Some("sqlite") => Mode::Standard,
142 other => bail!("unknown parse mode: {:?}", other),
143 };
144 self.parse_record()
145 }
146
147 "copy" => Ok(Record::Copy {
148 table_name: words
149 .next()
150 .ok_or_else(|| anyhow!("load directive missing table name"))?,
151 tsv_path: words
152 .next()
153 .ok_or_else(|| anyhow!("load directive missing TSV path"))?,
154 }),
155
156 "reset-server" => Ok(Record::ResetServer),
157
158 "replace" => {
163 let args = first_line
164 .strip_prefix("replace")
165 .expect("dispatched on \"replace\"")
166 .trim_start();
167 let mut parts = REPLACE_SEP_REGEX.splitn(args, 2);
168 let pattern = parts
169 .next()
170 .filter(|s| !s.is_empty())
171 .ok_or_else(|| anyhow!("replace directive missing regex in: {}", first_line))?;
172 let replacement = parts.next().ok_or_else(|| {
173 anyhow!(
174 "replace directive missing replacement (separate the regex \
175 and replacement with two or more spaces) in: {}",
176 first_line
177 )
178 })?;
179 Regex::new(pattern).map_err(|e| {
181 anyhow!("invalid regex {:?} in replace directive: {}", pattern, e)
182 })?;
183 Ok(Record::Replace {
184 pattern: pattern.to_owned(),
185 replacement: replacement.to_owned(),
186 })
187 }
188
189 other => bail!(
190 "Unexpected start of record on line {}: {}",
191 line_number,
192 other
193 ),
194 }
195 }
196
197 pub fn parse_records(&mut self) -> Result<Vec<Record<'a>>, anyhow::Error> {
198 let mut records = vec![];
199 loop {
200 match self.parse_record()? {
201 Record::Halt => break,
202 record => records.push(record),
203 }
204 }
205 Ok(records)
206 }
207
208 fn parse_statement(
209 &mut self,
210 mut words: impl Iterator<Item = &'a str>,
211 first_line: &'a str,
212 ) -> Result<Record<'a>, anyhow::Error> {
213 let location = self.location();
214 let mut expected_error = None;
215 let mut rows_affected = None;
216 match words.next() {
217 Some("count") => {
218 rows_affected = Some(
219 words
220 .next()
221 .ok_or_else(|| anyhow!("missing count of rows affected"))?
222 .parse::<u64>()
223 .map_err(|err| anyhow!("parsing count of rows affected: {}", err))?,
224 );
225 }
226 Some("ok") | Some("OK") => (),
227 Some("error") => expected_error = Some(parse_expected_error(first_line)),
228 _ => bail!("invalid statement disposition: {}", first_line),
229 };
230 let sql = self.split_at(&DOUBLE_LINE_REGEX)?;
231 Ok(Record::Statement {
232 expected_error,
233 rows_affected,
234 sql,
235 location,
236 })
237 }
238
239 fn parse_query(
240 &mut self,
241 mut words: std::iter::Peekable<impl Iterator<Item = &'a str>>,
242 first_line: &'a str,
243 ) -> Result<Record<'a>, anyhow::Error> {
244 let location = self.location();
245 if words.peek() == Some(&"error") {
246 let error = parse_expected_error(first_line);
247 let sql = self.split_at(&DOUBLE_LINE_REGEX)?;
248 return Ok(Record::Query {
249 sql,
250 output: Err(error),
251 location,
252 });
253 }
254
255 let types = words.next().map_or(Ok(vec![]), parse_types)?;
256 let mut sort = Sort::No;
257 let mut check_column_names = false;
258 let mut multiline = false;
259 if let Some(options) = words.next() {
260 for option in options.split(',') {
261 match option {
262 "nosort" => sort = Sort::No,
263 "rowsort" => sort = Sort::Row,
264 "valuesort" => sort = Sort::Value,
265 "colnames" => check_column_names = true,
266 "multiline" => multiline = true,
267 other => {
268 if other.starts_with("partialsort") {
269 sort = Sort::Row;
273 break;
274 } else {
275 bail!("Unrecognized option {:?} in {:?}", other, options);
276 }
277 }
278 };
279 }
280 }
281 if multiline && (check_column_names || sort.yes()) {
282 bail!("multiline option is incompatible with all other options");
283 }
284 let label = words.next();
285 static LINE_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new("\r?(\n|$)").unwrap());
286 static HASH_REGEX: LazyLock<Regex> =
287 LazyLock::new(|| Regex::new(r"(\S+) values hashing to (\S+)").unwrap());
288 let sql = self.split_at(&QUERY_OUTPUT_REGEX)?;
289 let mut output_str = self.split_at(if multiline {
290 &EOF_REGEX
291 } else {
292 &DOUBLE_LINE_REGEX
293 })?;
294
295 output_str = if let Some(output_str_stripped) = regexp_strip_prefix(output_str, &LINE_REGEX)
299 {
300 output_str_stripped
301 } else {
302 assert!(output_str.is_empty());
307 output_str
308 };
309
310 let query_output_str = output_str;
314 let column_names = if check_column_names {
315 Some(
316 split_at(&mut output_str, &LINE_REGEX)?
317 .split(' ')
318 .filter(|s| !s.is_empty())
319 .map(|s| ColumnName::from(s.replace('␠', " ")))
320 .collect(),
321 )
322 } else {
323 None
324 };
325 let output = match HASH_REGEX.captures(output_str) {
326 Some(captures) => Output::Hashed {
327 num_values: captures.get(1).unwrap().as_str().parse::<usize>()?,
328 md5: captures.get(2).unwrap().as_str().to_owned(),
329 },
330 None => {
331 if multiline {
332 Output::Values(vec![output_str.to_owned()])
333 } else if output_str.starts_with('\r') || output_str.starts_with('\n') {
334 Output::Values(vec![])
335 } else {
336 let mut vals: Vec<String> = output_str.lines().map(|s| s.to_owned()).collect();
337 match self.mode {
338 Mode::Standard => {
339 if !multiline {
340 vals = vals.into_iter().map(|val| val.replace('⏎', "\n")).collect();
341 }
342
343 if sort == Sort::Value {
344 vals.sort();
345 }
346 }
347 Mode::Cockroach => {
348 let mut rows: Vec<Vec<String>> = vec![];
349 for line in vals {
350 let cols = split_cols(&line, types.len());
351 if sort != Sort::No && cols.len() != types.len() {
352 bail!(
358 "col len ({}) did not match declared col len ({})",
359 cols.len(),
360 types.len()
361 );
362 }
363 rows.push(
364 cols.into_iter()
365 .map(|col| {
366 let mut col = col.replace('␠', " ");
367 if !multiline {
368 col = col.replace('⏎', "\n");
369 }
370 col
371 })
372 .collect(),
373 );
374 }
375 if sort == Sort::Row {
376 rows.sort();
377 }
378 vals = rows.into_iter().flatten().collect();
379 if sort == Sort::Value {
380 vals.sort();
381 }
382 }
383 }
384 Output::Values(vals)
385 }
386 }
387 };
388 Ok(Record::Query {
389 sql,
390 output: Ok(QueryOutput {
391 types,
392 sort,
393 multiline,
394 label,
395 column_names,
396 mode: self.mode,
397 output,
398 output_str: query_output_str,
399 }),
400 location,
401 })
402 }
403
404 fn parse_simple(
405 &mut self,
406 mut words: std::iter::Peekable<impl Iterator<Item = &'a str>>,
407 ) -> Result<Record<'a>, anyhow::Error> {
408 let location = self.location();
409 let mut conn = None;
410 let mut user = None;
411 let mut password = None;
412 let mut multiline = false;
413 let mut sort = Sort::No;
414 if let Some(options) = words.next() {
415 for option in options.split(',') {
416 if let Some(value) = option.strip_prefix("conn=") {
417 conn = Some(value);
418 } else if let Some(value) = option.strip_prefix("user=") {
419 user = Some(value);
420 } else if let Some(value) = option.strip_prefix("password=") {
421 password = Some(value);
422 } else if option == "rowsort" {
423 sort = Sort::Row;
424 } else if option == "multiline" {
425 multiline = true;
426 } else {
427 bail!("Unrecognized option {:?} in {:?}", option, options);
428 }
429 }
430 }
431 if user.is_some() && conn.is_none() {
432 bail!("cannot set user without also setting conn");
433 }
434 if password.is_some() && user.is_none() {
435 bail!("cannot set password without also setting user");
436 }
437 let sql = self.split_at(&QUERY_OUTPUT_REGEX)?;
438 let output_str = self
439 .split_at(if multiline {
440 &EOF_REGEX
441 } else {
442 &DOUBLE_LINE_REGEX
443 })?
444 .trim_start();
445 let output = if multiline {
446 Output::Values({
447 let mut v = vec![output_str.to_owned()];
448 let complete_str = self.split_at(&DOUBLE_LINE_REGEX)?.trim_start();
450 v.extend(complete_str.lines().map(String::from));
451 v
452 })
453 } else {
454 let mut output_lines: Vec<String> = output_str.lines().map(String::from).collect();
457
458 if self.mode == Mode::Cockroach && sort == Sort::Row {
459 output_lines.sort();
460 }
461
462 Output::Values(output_lines)
463 };
464 Ok(Record::Simple {
465 location,
466 conn,
467 user,
468 password,
469 sql,
470 sort,
471 output,
472 output_str,
473 })
474 }
475}
476
477fn split_at<'a>(input: &mut &'a str, sep: &Regex) -> Result<&'a str, anyhow::Error> {
478 match sep.find(input) {
479 Some(found) => {
480 let result = &input[..found.start()];
481 *input = &input[found.end()..];
482 Ok(result)
483 }
484 None => bail!("Couldn't split {:?} at {:?}", input, sep),
485 }
486}
487
488fn parse_types(input: &str) -> Result<Vec<Type>, anyhow::Error> {
490 input
491 .chars()
492 .map(|char| {
493 Ok(match char {
494 'T' => Type::Text,
495 'I' => Type::Integer,
496 'R' => Type::Real,
497 'B' => Type::Bool,
498 'O' => Type::Oid,
499 _ => bail!("Unexpected type char {} in: {}", char, input),
500 })
501 })
502 .collect()
503}
504
505fn parse_expected_error(line: &str) -> &str {
506 static PGCODE_RE: LazyLock<Regex> =
507 LazyLock::new(|| Regex::new("(statement|query) error( pgcode [a-zA-Z0-9]{5})? ?").unwrap());
508 let pos = PGCODE_RE.find(line).unwrap().end();
511 &line[pos..]
512}
513
514pub(crate) fn split_cols(line: &str, expected_columns: usize) -> Vec<&str> {
520 if expected_columns == 1 {
521 vec![line.trim()]
522 } else {
523 line.split_whitespace().collect()
524 }
525}
526
527pub fn regexp_strip_prefix<'a>(text: &'a str, regexp: &Regex) -> Option<&'a str> {
528 match regexp.find(text) {
529 Some(found) => {
530 if found.start() == 0 {
531 Some(&text[found.end()..])
532 } else {
533 None
534 }
535 }
536 None => None,
537 }
538}