1use std::borrow::ToOwned;
13use std::sync::LazyLock;
14
15use anyhow::{anyhow, bail};
16use mz_repr::ColumnName;
17use regex::Regex;
18
19use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
20
21static QUERY_OUTPUT_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\r?\n----").unwrap());
22static DOUBLE_LINE_REGEX: LazyLock<Regex> =
23 LazyLock::new(|| Regex::new(r"(\n|\r\n|$)(\n|\r\n|$)").unwrap());
24static EOF_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"(\n|\r\n)EOF(\n|\r\n)").unwrap());
25
26#[derive(Debug, Clone)]
27pub struct Parser<'a> {
28 contents: &'a str,
29 fname: String,
30 curline: usize,
31 mode: Mode,
32}
33
34impl<'a> Parser<'a> {
35 pub fn new(fname: &str, contents: &'a str) -> Self {
36 Parser {
37 contents,
38 fname: fname.to_string(),
39 curline: 1,
40 mode: Mode::Standard,
41 }
42 }
43
44 pub fn is_done(&self) -> bool {
45 self.contents.is_empty()
46 }
47
48 pub fn location(&self) -> Location {
49 Location {
50 file: self.fname.clone(),
51 line: self.curline,
52 }
53 }
54
55 fn consume(&mut self, upto: usize) {
56 for ch in self.contents[..upto].chars() {
57 if ch == '\n' {
58 self.curline += 1;
59 }
60 }
61 self.contents = &self.contents[upto..];
62 }
63
64 pub fn split_at(&mut self, sep: &Regex) -> Result<&'a str, anyhow::Error> {
65 match sep.find(self.contents) {
66 Some(found) => {
67 let result = &self.contents[..found.start()];
68 self.consume(found.end());
69 Ok(result)
70 }
71 None => bail!("Couldn't split {:?} at {:?}", self.contents, sep),
72 }
73 }
74
75 pub fn parse_record(&mut self) -> Result<Record<'a>, anyhow::Error> {
76 if self.is_done() {
77 return Ok(Record::Halt);
78 }
79
80 let line_number = self.curline;
81
82 static COMMENT_AND_LINE_REGEX: LazyLock<Regex> =
83 LazyLock::new(|| Regex::new("(#[^\n]*)?\r?(\n|$)").unwrap());
84 let first_line = self.split_at(&COMMENT_AND_LINE_REGEX)?.trim();
85
86 if first_line.is_empty() {
87 return self.parse_record();
89 }
90
91 let mut words = first_line.split(' ').peekable();
92 match words.next().unwrap() {
93 "statement" => self.parse_statement(words, first_line),
94
95 "query" => self.parse_query(words, first_line),
96
97 "simple" => self.parse_simple(words),
98
99 "hash-threshold" => {
100 let threshold = words
101 .next()
102 .ok_or_else(|| anyhow!("missing threshold in: {}", first_line))?
103 .parse::<u64>()
104 .map_err(|err| anyhow!("invalid threshold ({}) in: {}", err, first_line))?;
105 Ok(Record::HashThreshold { threshold })
106 }
107
108 "skipif" => {
110 match words.next().unwrap() {
111 "postgresql" => {
112 self.parse_record()?;
114 self.parse_record()
115 }
116 _ => self.parse_record(),
117 }
118 }
119 "onlyif" => {
120 match words.next().unwrap() {
121 "postgresql" => self.parse_record(),
122 _ => {
123 self.parse_record()?;
125 self.parse_record()
126 }
127 }
128 }
129
130 "halt" => Ok(Record::Halt),
131
132 "subtest" | "user" | "kv-batch-size" => self.parse_record(),
134
135 "mode" => {
136 self.mode = match words.next() {
137 Some("cockroach") => Mode::Cockroach,
138 Some("standard") | Some("sqlite") => Mode::Standard,
139 other => bail!("unknown parse mode: {:?}", other),
140 };
141 self.parse_record()
142 }
143
144 "copy" => Ok(Record::Copy {
145 table_name: words
146 .next()
147 .ok_or_else(|| anyhow!("load directive missing table name"))?,
148 tsv_path: words
149 .next()
150 .ok_or_else(|| anyhow!("load directive missing TSV path"))?,
151 }),
152
153 "reset-server" => Ok(Record::ResetServer),
154
155 other => bail!(
156 "Unexpected start of record on line {}: {}",
157 line_number,
158 other
159 ),
160 }
161 }
162
163 pub fn parse_records(&mut self) -> Result<Vec<Record<'a>>, anyhow::Error> {
164 let mut records = vec![];
165 loop {
166 match self.parse_record()? {
167 Record::Halt => break,
168 record => records.push(record),
169 }
170 }
171 Ok(records)
172 }
173
174 fn parse_statement(
175 &mut self,
176 mut words: impl Iterator<Item = &'a str>,
177 first_line: &'a str,
178 ) -> Result<Record<'a>, anyhow::Error> {
179 let location = self.location();
180 let mut expected_error = None;
181 let mut rows_affected = None;
182 match words.next() {
183 Some("count") => {
184 rows_affected = Some(
185 words
186 .next()
187 .ok_or_else(|| anyhow!("missing count of rows affected"))?
188 .parse::<u64>()
189 .map_err(|err| anyhow!("parsing count of rows affected: {}", err))?,
190 );
191 }
192 Some("ok") | Some("OK") => (),
193 Some("error") => expected_error = Some(parse_expected_error(first_line)),
194 _ => bail!("invalid statement disposition: {}", first_line),
195 };
196 let sql = self.split_at(&DOUBLE_LINE_REGEX)?;
197 Ok(Record::Statement {
198 expected_error,
199 rows_affected,
200 sql,
201 location,
202 })
203 }
204
205 fn parse_query(
206 &mut self,
207 mut words: std::iter::Peekable<impl Iterator<Item = &'a str>>,
208 first_line: &'a str,
209 ) -> Result<Record<'a>, anyhow::Error> {
210 let location = self.location();
211 if words.peek() == Some(&"error") {
212 let error = parse_expected_error(first_line);
213 let sql = self.split_at(&DOUBLE_LINE_REGEX)?;
214 return Ok(Record::Query {
215 sql,
216 output: Err(error),
217 location,
218 });
219 }
220
221 let types = words.next().map_or(Ok(vec![]), parse_types)?;
222 let mut sort = Sort::No;
223 let mut check_column_names = false;
224 let mut multiline = false;
225 if let Some(options) = words.next() {
226 for option in options.split(',') {
227 match option {
228 "nosort" => sort = Sort::No,
229 "rowsort" => sort = Sort::Row,
230 "valuesort" => sort = Sort::Value,
231 "colnames" => check_column_names = true,
232 "multiline" => multiline = true,
233 other => {
234 if other.starts_with("partialsort") {
235 sort = Sort::Row;
239 break;
240 } else {
241 bail!("Unrecognized option {:?} in {:?}", other, options);
242 }
243 }
244 };
245 }
246 }
247 if multiline && (check_column_names || sort.yes()) {
248 bail!("multiline option is incompatible with all other options");
249 }
250 let label = words.next();
251 static LINE_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new("\r?(\n|$)").unwrap());
252 static HASH_REGEX: LazyLock<Regex> =
253 LazyLock::new(|| Regex::new(r"(\S+) values hashing to (\S+)").unwrap());
254 let sql = self.split_at(&QUERY_OUTPUT_REGEX)?;
255 let mut output_str = self.split_at(if multiline {
256 &EOF_REGEX
257 } else {
258 &DOUBLE_LINE_REGEX
259 })?;
260
261 output_str = if let Some(output_str_stripped) = regexp_strip_prefix(output_str, &LINE_REGEX)
265 {
266 output_str_stripped
267 } else {
268 assert!(output_str.is_empty());
273 output_str
274 };
275
276 let query_output_str = output_str;
280 let column_names = if check_column_names {
281 Some(
282 split_at(&mut output_str, &LINE_REGEX)?
283 .split(' ')
284 .filter(|s| !s.is_empty())
285 .map(|s| ColumnName::from(s.replace('␠', " ")))
286 .collect(),
287 )
288 } else {
289 None
290 };
291 let output = match HASH_REGEX.captures(output_str) {
292 Some(captures) => Output::Hashed {
293 num_values: captures.get(1).unwrap().as_str().parse::<usize>()?,
294 md5: captures.get(2).unwrap().as_str().to_owned(),
295 },
296 None => {
297 if multiline {
298 Output::Values(vec![output_str.to_owned()])
299 } else if output_str.starts_with('\r') || output_str.starts_with('\n') {
300 Output::Values(vec![])
301 } else {
302 let mut vals: Vec<String> = output_str.lines().map(|s| s.to_owned()).collect();
303 match self.mode {
304 Mode::Standard => {
305 if !multiline {
306 vals = vals.into_iter().map(|val| val.replace('⏎', "\n")).collect();
307 }
308
309 if sort == Sort::Value {
310 vals.sort();
311 }
312 }
313 Mode::Cockroach => {
314 let mut rows: Vec<Vec<String>> = vec![];
315 for line in vals {
316 let cols = split_cols(&line, types.len());
317 if sort != Sort::No && cols.len() != types.len() {
318 bail!(
324 "col len ({}) did not match declared col len ({})",
325 cols.len(),
326 types.len()
327 );
328 }
329 rows.push(
330 cols.into_iter()
331 .map(|col| {
332 let mut col = col.replace('␠', " ");
333 if !multiline {
334 col = col.replace('⏎', "\n");
335 }
336 col
337 })
338 .collect(),
339 );
340 }
341 if sort == Sort::Row {
342 rows.sort();
343 }
344 vals = rows.into_iter().flatten().collect();
345 if sort == Sort::Value {
346 vals.sort();
347 }
348 }
349 }
350 Output::Values(vals)
351 }
352 }
353 };
354 Ok(Record::Query {
355 sql,
356 output: Ok(QueryOutput {
357 types,
358 sort,
359 multiline,
360 label,
361 column_names,
362 mode: self.mode,
363 output,
364 output_str: query_output_str,
365 }),
366 location,
367 })
368 }
369
370 fn parse_simple(
371 &mut self,
372 mut words: std::iter::Peekable<impl Iterator<Item = &'a str>>,
373 ) -> Result<Record<'a>, anyhow::Error> {
374 let location = self.location();
375 let mut conn = None;
376 let mut user = None;
377 let mut password = None;
378 let mut multiline = false;
379 let mut sort = Sort::No;
380 if let Some(options) = words.next() {
381 for option in options.split(',') {
382 if let Some(value) = option.strip_prefix("conn=") {
383 conn = Some(value);
384 } else if let Some(value) = option.strip_prefix("user=") {
385 user = Some(value);
386 } else if let Some(value) = option.strip_prefix("password=") {
387 password = Some(value);
388 } else if option == "rowsort" {
389 sort = Sort::Row;
390 } else if option == "multiline" {
391 multiline = true;
392 } else {
393 bail!("Unrecognized option {:?} in {:?}", option, options);
394 }
395 }
396 }
397 if user.is_some() && conn.is_none() {
398 bail!("cannot set user without also setting conn");
399 }
400 if password.is_some() && user.is_none() {
401 bail!("cannot set password without also setting user");
402 }
403 let sql = self.split_at(&QUERY_OUTPUT_REGEX)?;
404 let output_str = self
405 .split_at(if multiline {
406 &EOF_REGEX
407 } else {
408 &DOUBLE_LINE_REGEX
409 })?
410 .trim_start();
411 let output = if multiline {
412 Output::Values({
413 let mut v = vec![output_str.to_owned()];
414 let complete_str = self.split_at(&DOUBLE_LINE_REGEX)?.trim_start();
416 v.extend(complete_str.lines().map(String::from));
417 v
418 })
419 } else {
420 let mut output_lines: Vec<String> = output_str.lines().map(String::from).collect();
423
424 if self.mode == Mode::Cockroach && sort == Sort::Row {
425 output_lines.sort();
426 }
427
428 Output::Values(output_lines)
429 };
430 Ok(Record::Simple {
431 location,
432 conn,
433 user,
434 password,
435 sql,
436 sort,
437 output,
438 output_str,
439 })
440 }
441}
442
443fn split_at<'a>(input: &mut &'a str, sep: &Regex) -> Result<&'a str, anyhow::Error> {
444 match sep.find(input) {
445 Some(found) => {
446 let result = &input[..found.start()];
447 *input = &input[found.end()..];
448 Ok(result)
449 }
450 None => bail!("Couldn't split {:?} at {:?}", input, sep),
451 }
452}
453
454fn parse_types(input: &str) -> Result<Vec<Type>, anyhow::Error> {
456 input
457 .chars()
458 .map(|char| {
459 Ok(match char {
460 'T' => Type::Text,
461 'I' => Type::Integer,
462 'R' => Type::Real,
463 'B' => Type::Bool,
464 'O' => Type::Oid,
465 _ => bail!("Unexpected type char {} in: {}", char, input),
466 })
467 })
468 .collect()
469}
470
471fn parse_expected_error(line: &str) -> &str {
472 static PGCODE_RE: LazyLock<Regex> =
473 LazyLock::new(|| Regex::new("(statement|query) error( pgcode [a-zA-Z0-9]{5})? ?").unwrap());
474 let pos = PGCODE_RE.find(line).unwrap().end();
477 &line[pos..]
478}
479
480pub(crate) fn split_cols(line: &str, expected_columns: usize) -> Vec<&str> {
486 if expected_columns == 1 {
487 vec![line.trim()]
488 } else {
489 line.split_whitespace().collect()
490 }
491}
492
493pub fn regexp_strip_prefix<'a>(text: &'a str, regexp: &Regex) -> Option<&'a str> {
494 match regexp.find(text) {
495 Some(found) => {
496 if found.start() == 0 {
497 Some(&text[found.end()..])
498 } else {
499 None
500 }
501 }
502 None => None,
503 }
504}