1use std::collections::BTreeMap;
35
36use anyhow::{Context, anyhow, bail, ensure};
37
38use crate::script::{BuildSpec, ColumnSpec, Command, ConfigSetting, ExportSpec, ImportSpec};
39
40pub enum Item {
43 Verbatim(String),
45 Stanza(Stanza),
47}
48
49pub struct Stanza {
51 pub input: String,
53 pub expected: String,
55 pub command: Command,
57}
58
59pub fn parse_file(content: &str) -> anyhow::Result<Vec<Item>> {
62 let lines: Vec<&str> = content.lines().collect();
63 let mut items = Vec::new();
64 let mut i = 0;
65 while i < lines.len() {
66 let line = lines[i];
67 if line.trim().is_empty() || line.starts_with('#') {
69 items.push(Item::Verbatim(line.to_string()));
70 i += 1;
71 continue;
72 }
73 let start = i;
75 while i < lines.len() && lines[i] != "----" {
76 i += 1;
77 }
78 ensure!(
79 i < lines.len(),
80 "stanza starting at line {} has no `----` separator",
81 start + 1
82 );
83 let input = lines[start..i].join("\n");
84 i += 1; let exp_start = i;
87 while i < lines.len() && !lines[i].trim().is_empty() {
88 i += 1;
89 }
90 let expected = lines[exp_start..i].join("\n");
91 let command = parse_command(&input)
92 .with_context(|| format!("parsing stanza at line {}", start + 1))?;
93 items.push(Item::Stanza(Stanza {
94 input,
95 expected,
96 command,
97 }));
98 }
99 Ok(items)
100}
101
102pub fn rewrite(items: &[Item], actuals: &[String]) -> String {
106 let mut out = String::new();
107 let mut next = 0;
108 for item in items {
109 match item {
110 Item::Verbatim(line) => {
111 out.push_str(line);
112 out.push('\n');
113 }
114 Item::Stanza(stanza) => {
115 let actual = &actuals[next];
116 next += 1;
117 out.push_str(&stanza.input);
118 out.push_str("\n----\n");
119 out.push_str(actual);
120 if !actual.is_empty() {
121 out.push('\n');
122 }
123 }
124 }
125 }
126 out
127}
128
129struct Line {
131 indent: usize,
132 text: String,
133}
134
135fn lex(block: &str) -> Vec<Line> {
137 block
138 .lines()
139 .filter_map(|raw| {
140 let trimmed = raw.trim_end();
141 if trimmed.trim().is_empty() {
142 return None;
143 }
144 let indent = trimmed.len() - trimmed.trim_start().len();
145 Some(Line {
146 indent,
147 text: trimmed.trim_start().to_string(),
148 })
149 })
150 .collect()
151}
152
153fn group(lines: &[Line]) -> anyhow::Result<Vec<(&Line, &[Line])>> {
156 if lines.is_empty() {
157 return Ok(vec![]);
158 }
159 let base = lines.iter().map(|l| l.indent).min().expect("non-empty");
160 let mut groups = Vec::new();
161 let mut i = 0;
162 while i < lines.len() {
163 ensure!(
164 lines[i].indent == base,
165 "inconsistent indentation: `{}`",
166 lines[i].text
167 );
168 let start = i + 1;
169 let mut j = start;
170 while j < lines.len() && lines[j].indent > base {
171 j += 1;
172 }
173 groups.push((&lines[i], &lines[start..j]));
174 i = j;
175 }
176 Ok(groups)
177}
178
179fn body_text(lines: &[Line]) -> String {
182 if lines.is_empty() {
183 return String::new();
184 }
185 let base = lines.iter().map(|l| l.indent).min().expect("non-empty");
186 lines
187 .iter()
188 .map(|l| format!("{}{}", " ".repeat(l.indent - base), l.text))
189 .collect::<Vec<_>>()
190 .join("\n")
191}
192
193fn tokenize(s: &str) -> anyhow::Result<Vec<String>> {
196 let mut tokens = Vec::new();
197 let mut cur = String::new();
198 let mut depth = 0i32;
199 let mut in_quote = false;
200 for c in s.chars() {
201 match c {
202 '"' => {
203 in_quote = !in_quote;
204 cur.push(c);
205 }
206 '[' if !in_quote => {
207 depth += 1;
208 cur.push(c);
209 }
210 ']' if !in_quote => {
211 depth -= 1;
212 cur.push(c);
213 }
214 c if c.is_whitespace() && !in_quote && depth == 0 => {
215 if !cur.is_empty() {
216 tokens.push(std::mem::take(&mut cur));
217 }
218 }
219 c => cur.push(c),
220 }
221 }
222 ensure!(!in_quote, "unterminated string in `{s}`");
223 ensure!(depth == 0, "unterminated list in `{s}`");
224 if !cur.is_empty() {
225 tokens.push(cur);
226 }
227 Ok(tokens)
228}
229
230fn parse_header(header: &str) -> anyhow::Result<(String, BTreeMap<String, String>, Vec<String>)> {
232 let mut tokens = tokenize(header)?.into_iter();
233 let verb = tokens.next().context("empty command")?;
234 let mut args = BTreeMap::new();
235 let mut flags = Vec::new();
236 for token in tokens {
237 match token.split_once('=') {
238 Some((key, value)) => {
239 args.insert(key.to_string(), value.to_string());
240 }
241 None => flags.push(token),
242 }
243 }
244 Ok((verb, args, flags))
245}
246
247fn req<'a>(args: &'a BTreeMap<String, String>, key: &str) -> anyhow::Result<&'a str> {
248 args.get(key)
249 .map(String::as_str)
250 .ok_or_else(|| anyhow!("missing argument `{key}`"))
251}
252
253fn req_u64(args: &BTreeMap<String, String>, key: &str) -> anyhow::Result<u64> {
254 req(args, key)?
255 .parse()
256 .with_context(|| format!("argument `{key}` is not an integer"))
257}
258
259fn opt_u64(args: &BTreeMap<String, String>, key: &str) -> anyhow::Result<Option<u64>> {
260 args.get(key)
261 .map(|v| {
262 v.parse()
263 .with_context(|| format!("argument `{key}` is not an integer"))
264 })
265 .transpose()
266}
267
268fn opt_usize(args: &BTreeMap<String, String>, key: &str) -> anyhow::Result<Option<usize>> {
269 args.get(key)
270 .map(|v| {
271 v.parse()
272 .with_context(|| format!("argument `{key}` is not an integer"))
273 })
274 .transpose()
275}
276
277fn opt_string(args: &BTreeMap<String, String>, key: &str) -> Option<String> {
278 args.get(key).cloned()
279}
280
281fn parse_usize_list(s: &str) -> anyhow::Result<Vec<usize>> {
283 let inner = s
284 .strip_prefix('[')
285 .and_then(|s| s.strip_suffix(']'))
286 .ok_or_else(|| anyhow!("expected a list like `[0,1]`, got `{s}`"))?;
287 if inner.trim().is_empty() {
288 return Ok(vec![]);
289 }
290 inner
291 .split(',')
292 .map(|part| {
293 part.trim()
294 .parse()
295 .with_context(|| format!("bad list element `{part}`"))
296 })
297 .collect()
298}
299
300fn parse_export(args: &BTreeMap<String, String>) -> anyhow::Result<ExportSpec> {
303 let on_id = req_u64(args, "on")?;
304 Ok(
305 match args.get("kind").map(String::as_str).unwrap_or("index") {
306 "index" => ExportSpec::Index {
307 index_id: req_u64(args, "index")?,
308 on_id,
309 key: parse_usize_list(req(args, "key")?)?,
310 },
311 "materialized-view" => ExportSpec::MaterializedView {
312 sink_id: req_u64(args, "sink")?,
313 on_id,
314 shard: req(args, "shard")?.to_string(),
315 schema: opt_string(args, "schema"),
316 },
317 "subscribe" => ExportSpec::Subscribe {
318 sink_id: req_u64(args, "sink")?,
319 on_id,
320 schema: opt_string(args, "schema"),
321 up_to: opt_u64(args, "up-to")?,
322 },
323 "copy-to" => bail!("copy-to export is not implemented"),
324 other => bail!("unknown export kind `{other}`"),
325 },
326 )
327}
328
329fn rows_from_body(body: &[Line]) -> anyhow::Result<Vec<Vec<String>>> {
332 body.iter().map(|l| tokenize(&l.text)).collect()
333}
334
335fn settings_from_body(body: &[Line]) -> anyhow::Result<Vec<ConfigSetting>> {
337 body.iter()
338 .map(|l| {
339 let tokens = tokenize(&l.text)?;
340 ensure!(
341 tokens.len() == 3,
342 "config setting needs `name type value`, got `{}`",
343 l.text
344 );
345 Ok(ConfigSetting {
346 name: tokens[0].clone(),
347 ty: tokens[1].clone(),
348 value: tokens[2].clone(),
349 })
350 })
351 .collect()
352}
353
354fn columns_from_body(body: &[Line]) -> anyhow::Result<Vec<ColumnSpec>> {
356 body.iter()
357 .map(|l| {
358 let tokens = tokenize(&l.text)?;
359 ensure!(
360 tokens.len() >= 2,
361 "column needs `name type [nullable]`, got `{}`",
362 l.text
363 );
364 Ok(ColumnSpec {
365 name: tokens[0].clone(),
366 ty: tokens[1].clone(),
367 nullable: tokens.get(2).is_some_and(|t| t == "nullable"),
368 })
369 })
370 .collect()
371}
372
373fn parse_create_dataflow(
376 args: &BTreeMap<String, String>,
377 flags: &[String],
378 body: &[Line],
379) -> anyhow::Result<Command> {
380 let name = opt_string(args, "name");
381 let as_of = req_u64(args, "as-of")?;
382 let optimize = flags.iter().any(|f| f == "optimize");
383 let mut imports = Vec::new();
384 let mut builds = Vec::new();
385 let mut exports = Vec::new();
386 for (header, sub_body) in group(body)? {
387 let (verb, args, _flags) = parse_header(&header.text)?;
388 match verb.as_str() {
389 "import" => {
390 if let Some(index_id) = args.get("index") {
391 imports.push(ImportSpec::Index {
392 index_id: index_id
393 .parse()
394 .with_context(|| format!("bad index id `{index_id}`"))?,
395 });
396 } else {
397 imports.push(ImportSpec::Source {
398 id: req_u64(&args, "source")?,
399 shard: req(&args, "shard")?.to_string(),
400 schema: opt_string(&args, "schema"),
401 upper: req_u64(&args, "upper")?,
402 });
403 }
404 }
405 "build" => {
406 ensure!(!sub_body.is_empty(), "`build` needs a MIR body");
407 builds.push(BuildSpec {
408 id: req_u64(&args, "id")?,
409 expr: body_text(sub_body),
410 });
411 }
412 "export" => exports.push(parse_export(&args)?),
413 other => bail!("unknown `create-dataflow` sub-command `{other}`"),
414 }
415 }
416 Ok(Command::CreateDataflow {
417 name,
418 imports,
419 builds,
420 exports,
421 as_of,
422 optimize,
423 })
424}
425
426fn parse_command(input: &str) -> anyhow::Result<Command> {
428 let lines = lex(input);
429 let (header, body) = lines.split_first().context("empty command")?;
430 ensure!(header.indent == 0, "directive must not be indented");
431 let (verb, args, flags) = parse_header(&header.text)?;
432 let command = match verb.as_str() {
433 "define-schema" => Command::DefineSchema {
434 name: req(&args, "name")?.to_string(),
435 columns: columns_from_body(body)?,
436 },
437 "write-single-ts" => Command::WriteSingleTs {
438 shard: req(&args, "shard")?.to_string(),
439 schema: opt_string(&args, "schema"),
440 ts: req_u64(&args, "ts")?,
441 count: req_u64(&args, "count")?,
442 start: opt_u64(&args, "start")?.unwrap_or(0),
443 row_bytes: opt_usize(&args, "row-bytes")?,
444 },
445 "write-spread" => Command::WriteSpread {
446 shard: req(&args, "shard")?.to_string(),
447 schema: opt_string(&args, "schema"),
448 count: req_u64(&args, "count")?,
449 n_ts: req_u64(&args, "n-ts")?,
450 start: opt_u64(&args, "start")?.unwrap_or(0),
451 row_bytes: opt_usize(&args, "row-bytes")?,
452 },
453 "write-rows" => Command::WriteRows {
454 shard: req(&args, "shard")?.to_string(),
455 schema: opt_string(&args, "schema"),
456 ts: req_u64(&args, "ts")?,
457 rows: rows_from_body(body)?,
458 },
459 "define-index" => Command::DefineIndex {
460 source_id: req_u64(&args, "source")?,
461 index_id: req_u64(&args, "index")?,
462 shard: req(&args, "shard")?.to_string(),
463 schema: opt_string(&args, "schema"),
464 key: parse_usize_list(req(&args, "key")?)?,
465 as_of: req_u64(&args, "as-of")?,
466 upper: req_u64(&args, "upper")?,
467 },
468 "schedule" => Command::Schedule {
469 id: req_u64(&args, "id")?,
470 },
471 "allow-compaction" => Command::AllowCompaction {
472 id: req_u64(&args, "id")?,
473 frontier: req_u64(&args, "frontier")?,
474 },
475 "allow-writes" => Command::AllowWrites {
476 id: req_u64(&args, "id")?,
477 },
478 "await-frontier" => Command::AwaitFrontier {
479 id: req_u64(&args, "id")?,
480 ts: req_u64(&args, "ts")?,
481 timeout_secs: opt_u64(&args, "timeout-secs")?,
482 allow_timeout: flags.iter().any(|f| f == "allow-timeout"),
483 },
484 "count" => Command::Count {
485 id: req_u64(&args, "id")?,
486 ts: req_u64(&args, "ts")?,
487 },
488 "peek" => Command::Peek {
489 id: req_u64(&args, "id")?,
490 schema: opt_string(&args, "schema"),
491 ts: req_u64(&args, "ts")?,
492 },
493 "await-subscribe" => Command::AwaitSubscribe {
494 id: req_u64(&args, "id")?,
495 up_to: req_u64(&args, "up-to")?,
496 timeout_secs: opt_u64(&args, "timeout-secs")?,
497 },
498 "create-dataflow" => parse_create_dataflow(&args, &flags, body)?,
499 "create-instance" => Command::CreateInstance {
500 expiration_offset: opt_string(&args, "expiration-offset"),
501 arrangement_dictionary_compression: args
502 .get("arrangement-dictionary-compression")
503 .map(|v| v.parse())
504 .transpose()
505 .context("argument `arrangement-dictionary-compression` is not a bool")?
506 .unwrap_or(false),
507 },
508 "update-configuration" => Command::UpdateConfiguration {
509 updates: settings_from_body(body)?,
510 },
511 "reconnect" => Command::Reconnect,
512 "initialization-complete" => Command::InitializationComplete,
513 other => bail!("unknown command `{other}`"),
514 };
515 Ok(command)
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 #[mz_ore::test]
524 fn parses_simple_command() {
525 let cmd = parse_command("write-single-ts shard=data ts=0 count=5000").unwrap();
526 assert_eq!(
527 cmd,
528 Command::WriteSingleTs {
529 shard: "data".to_string(),
530 schema: None,
531 ts: 0,
532 count: 5000,
533 start: 0,
534 row_bytes: None,
535 }
536 );
537 }
538
539 #[mz_ore::test]
541 fn parses_flag_and_list() {
542 let cmd =
543 parse_command("await-frontier id=1001 ts=1 timeout-secs=3 allow-timeout").unwrap();
544 assert_eq!(
545 cmd,
546 Command::AwaitFrontier {
547 id: 1001,
548 ts: 1,
549 timeout_secs: Some(3),
550 allow_timeout: true,
551 }
552 );
553 let cmd =
554 parse_command("define-index source=1000 index=1001 shard=d key=[0] as-of=0 upper=1")
555 .unwrap();
556 assert_eq!(
557 cmd,
558 Command::DefineIndex {
559 source_id: 1000,
560 index_id: 1001,
561 shard: "d".to_string(),
562 schema: None,
563 key: vec![0],
564 as_of: 0,
565 upper: 1,
566 }
567 );
568 }
569
570 #[mz_ore::test]
572 fn parses_bodies() {
573 let cmd = parse_command(
574 "define-schema name=events\n key bigint\n flag boolean\n label text nullable",
575 )
576 .unwrap();
577 assert_eq!(
578 cmd,
579 Command::DefineSchema {
580 name: "events".to_string(),
581 columns: vec![
582 ColumnSpec {
583 name: "key".to_string(),
584 ty: "bigint".to_string(),
585 nullable: false
586 },
587 ColumnSpec {
588 name: "flag".to_string(),
589 ty: "boolean".to_string(),
590 nullable: false
591 },
592 ColumnSpec {
593 name: "label".to_string(),
594 ty: "text".to_string(),
595 nullable: true
596 },
597 ],
598 }
599 );
600
601 let cmd = parse_command(
602 "write-rows shard=ev schema=events ts=1\n 1000 true alpha\n 1001 false null",
603 )
604 .unwrap();
605 assert_eq!(
606 cmd,
607 Command::WriteRows {
608 shard: "ev".to_string(),
609 schema: Some("events".to_string()),
610 ts: 1,
611 rows: vec![
612 vec!["1000".to_string(), "true".to_string(), "alpha".to_string()],
613 vec!["1001".to_string(), "false".to_string(), "null".to_string()],
614 ],
615 }
616 );
617 }
618
619 #[mz_ore::test]
623 fn parses_create_dataflow_with_mir() {
624 let input = "create-dataflow name=count as-of=0\n import index=1001\n build id=2000\n Reduce aggregates=[count(*)]\n Get u1000\n export index=2001 on=2000 key=[0]";
625 let cmd = parse_command(input).unwrap();
626 assert_eq!(
627 cmd,
628 Command::CreateDataflow {
629 name: Some("count".to_string()),
630 imports: vec![ImportSpec::Index { index_id: 1001 }],
631 builds: vec![BuildSpec {
632 id: 2000,
633 expr: "Reduce aggregates=[count(*)]\n Get u1000".to_string(),
634 }],
635 exports: vec![ExportSpec::Index {
636 index_id: 2001,
637 on_id: 2000,
638 key: vec![0]
639 }],
640 as_of: 0,
641 optimize: false,
642 }
643 );
644
645 let optimized = parse_command(
647 "create-dataflow name=j as-of=0 optimize\n import source=1000 shard=l upper=1\n build id=2000\n Get u1000\n export index=2001 on=2000 key=[0]",
648 )
649 .unwrap();
650 assert!(matches!(
651 optimized,
652 Command::CreateDataflow { optimize: true, .. }
653 ));
654 }
655
656 #[mz_ore::test]
659 fn parses_sink_export_kinds() {
660 let mv = "create-dataflow name=mv as-of=0\n import source=1000 shard=r upper=1\n build id=2000\n Get u1000\n export kind=materialized-view sink=2001 on=2000 shard=out schema=kv";
661 let Command::CreateDataflow { exports, .. } = parse_command(mv).unwrap() else {
662 panic!("expected create-dataflow");
663 };
664 assert_eq!(
665 exports,
666 vec![ExportSpec::MaterializedView {
667 sink_id: 2001,
668 on_id: 2000,
669 shard: "out".to_string(),
670 schema: Some("kv".to_string()),
671 }]
672 );
673
674 let sub = "create-dataflow name=sub as-of=0\n import source=1000 shard=s upper=2\n build id=2000\n Get u1000\n export kind=subscribe sink=2001 on=2000 up-to=2";
675 let Command::CreateDataflow { exports, .. } = parse_command(sub).unwrap() else {
676 panic!("expected create-dataflow");
677 };
678 assert_eq!(
679 exports,
680 vec![ExportSpec::Subscribe {
681 sink_id: 2001,
682 on_id: 2000,
683 schema: None,
684 up_to: Some(2),
685 }]
686 );
687
688 let copy = "create-dataflow name=c as-of=0\n import source=1000 shard=s upper=1\n build id=2000\n Get u1000\n export kind=copy-to sink=2001 on=2000";
690 assert!(parse_command(copy).is_err());
691 }
692
693 #[mz_ore::test]
696 fn parses_handshake_config() {
697 assert_eq!(
698 parse_command("create-instance").unwrap(),
699 Command::CreateInstance {
700 expiration_offset: None,
701 arrangement_dictionary_compression: false,
702 }
703 );
704 assert_eq!(
705 parse_command(
706 "create-instance expiration-offset=30s arrangement-dictionary-compression=true"
707 )
708 .unwrap(),
709 Command::CreateInstance {
710 expiration_offset: Some("30s".to_string()),
711 arrangement_dictionary_compression: true,
712 }
713 );
714
715 assert_eq!(
716 parse_command("update-configuration").unwrap(),
717 Command::UpdateConfiguration { updates: vec![] }
718 );
719 assert_eq!(
720 parse_command("update-configuration\n enable_my_flag bool true\n my_dur duration 1s")
721 .unwrap(),
722 Command::UpdateConfiguration {
723 updates: vec![
724 ConfigSetting {
725 name: "enable_my_flag".to_string(),
726 ty: "bool".to_string(),
727 value: "true".to_string(),
728 },
729 ConfigSetting {
730 name: "my_dur".to_string(),
731 ty: "duration".to_string(),
732 value: "1s".to_string(),
733 },
734 ],
735 }
736 );
737 }
738
739 #[mz_ore::test]
741 fn parses_await_subscribe() {
742 let cmd = parse_command("await-subscribe id=2001 up-to=2 timeout-secs=5").unwrap();
743 assert_eq!(
744 cmd,
745 Command::AwaitSubscribe {
746 id: 2001,
747 up_to: 2,
748 timeout_secs: Some(5),
749 }
750 );
751 }
752
753 #[mz_ore::test]
756 fn parses_and_rewrites_file() {
757 let content =
758 "# a comment\nschedule id=1001\n----\nok\n\ncount id=1001 ts=5\n----\n10000\n";
759 let items = parse_file(content).unwrap();
760 let stanzas: Vec<_> = items
761 .iter()
762 .filter_map(|i| match i {
763 Item::Stanza(s) => Some(s),
764 Item::Verbatim(_) => None,
765 })
766 .collect();
767 assert_eq!(stanzas.len(), 2);
768 assert_eq!(stanzas[0].command, Command::Schedule { id: 1001 });
769 assert_eq!(stanzas[0].expected, "ok");
770 assert_eq!(stanzas[1].expected, "10000");
771
772 let actuals = vec!["ok".to_string(), "10000".to_string()];
774 assert_eq!(rewrite(&items, &actuals), content);
775 }
776}