Skip to main content

mz_clusterd_test_driver/
text.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The text script format: a hand-writable, `datadriven`-style command file.
11//!
12//! A script is a sequence of stanzas, each a command and its expected output:
13//!
14//! ```text
15//! write-single-ts shard=data ts=0 count=5000
16//! ----
17//! wrote 5000
18//!
19//! count id=1001 ts=5
20//! ----
21//! 10000
22//! ```
23//!
24//! A stanza is a command (a directive line plus an optional indentation-structured
25//! body) up to a `----` separator, then the expected output up to a blank line. The
26//! `----` block is the assertion; `REWRITE=1` regenerates it (see [`crate::script`]).
27//! A `#` at column 0 is a comment; an indented `#0` is a column reference in MIR.
28//! Comments and blank lines are preserved across a rewrite.
29//!
30//! Command bodies are indentation-structured: `define-schema`/`write-rows`/`peek`
31//! carry rows or columns, and `define` carries `import`/`build`/`export`
32//! sub-commands, with a `build`'s MIR as its own deeper-indented sub-body.
33
34use std::collections::BTreeMap;
35
36use anyhow::{Context, anyhow, bail, ensure};
37
38use crate::script::{BuildSpec, ColumnSpec, Command, ConfigSetting, ExportSpec, ImportSpec};
39
40/// One element of a parsed script file, retained so a `REWRITE` reproduces the
41/// file faithfully.
42pub enum Item {
43    /// A blank line or column-0 `#` comment, kept verbatim.
44    Verbatim(String),
45    /// A command and its expected output.
46    Stanza(Stanza),
47}
48
49/// A command stanza: the input block, the expected output, and the parsed command.
50pub struct Stanza {
51    /// The directive line plus body, verbatim (for rewrite).
52    pub input: String,
53    /// The expected output block (no trailing newline).
54    pub expected: String,
55    /// The parsed command.
56    pub command: Command,
57}
58
59/// Parse a script file into items: each command stanza plus the comments and blank
60/// lines around it.
61pub fn parse_file(content: &str) -> anyhow::Result<Vec<Item>> {
62    let lines: Vec<&str> = content.lines().collect();
63    let mut items = Vec::new();
64    let mut i = 0;
65    while i < lines.len() {
66        let line = lines[i];
67        // Blank lines and column-0 comments are preserved as-is.
68        if line.trim().is_empty() || line.starts_with('#') {
69            items.push(Item::Verbatim(line.to_string()));
70            i += 1;
71            continue;
72        }
73        // A stanza: slurp the input block up to the `----` separator.
74        let start = i;
75        while i < lines.len() && lines[i] != "----" {
76            i += 1;
77        }
78        ensure!(
79            i < lines.len(),
80            "stanza starting at line {} has no `----` separator",
81            start + 1
82        );
83        let input = lines[start..i].join("\n");
84        i += 1; // consume `----`
85        // The expected output runs to the next blank line (or end of file).
86        let exp_start = i;
87        while i < lines.len() && !lines[i].trim().is_empty() {
88            i += 1;
89        }
90        let expected = lines[exp_start..i].join("\n");
91        let command = parse_command(&input)
92            .with_context(|| format!("parsing stanza at line {}", start + 1))?;
93        items.push(Item::Stanza(Stanza {
94            input,
95            expected,
96            command,
97        }));
98    }
99    Ok(items)
100}
101
102/// Reproduce a script file with each stanza's expected output replaced by its
103/// actual output, for `REWRITE`. `actuals` has one entry per [`Item::Stanza`], in
104/// order.
105pub fn rewrite(items: &[Item], actuals: &[String]) -> String {
106    let mut out = String::new();
107    let mut next = 0;
108    for item in items {
109        match item {
110            Item::Verbatim(line) => {
111                out.push_str(line);
112                out.push('\n');
113            }
114            Item::Stanza(stanza) => {
115                let actual = &actuals[next];
116                next += 1;
117                out.push_str(&stanza.input);
118                out.push_str("\n----\n");
119                out.push_str(actual);
120                if !actual.is_empty() {
121                    out.push('\n');
122                }
123            }
124        }
125    }
126    out
127}
128
129/// A non-blank line of a command block, with its leading-space indentation.
130struct Line {
131    indent: usize,
132    text: String,
133}
134
135/// Split a command block into non-blank lines, recording each line's indentation.
136fn lex(block: &str) -> Vec<Line> {
137    block
138        .lines()
139        .filter_map(|raw| {
140            let trimmed = raw.trim_end();
141            if trimmed.trim().is_empty() {
142                return None;
143            }
144            let indent = trimmed.len() - trimmed.trim_start().len();
145            Some(Line {
146                indent,
147                text: trimmed.trim_start().to_string(),
148            })
149        })
150        .collect()
151}
152
153/// Split body lines into `(header, body)` groups: each header is a line at the
154/// minimum indentation, owning the following more-indented lines as its body.
155fn group(lines: &[Line]) -> anyhow::Result<Vec<(&Line, &[Line])>> {
156    if lines.is_empty() {
157        return Ok(vec![]);
158    }
159    let base = lines.iter().map(|l| l.indent).min().expect("non-empty");
160    let mut groups = Vec::new();
161    let mut i = 0;
162    while i < lines.len() {
163        ensure!(
164            lines[i].indent == base,
165            "inconsistent indentation: `{}`",
166            lines[i].text
167        );
168        let start = i + 1;
169        let mut j = start;
170        while j < lines.len() && lines[j].indent > base {
171            j += 1;
172        }
173        groups.push((&lines[i], &lines[start..j]));
174        i = j;
175    }
176    Ok(groups)
177}
178
179/// Render body lines as text, dedented to the body's minimum indentation so their
180/// relative structure (which MIR depends on) is preserved.
181fn body_text(lines: &[Line]) -> String {
182    if lines.is_empty() {
183        return String::new();
184    }
185    let base = lines.iter().map(|l| l.indent).min().expect("non-empty");
186    lines
187        .iter()
188        .map(|l| format!("{}{}", " ".repeat(l.indent - base), l.text))
189        .collect::<Vec<_>>()
190        .join("\n")
191}
192
193/// Split a header line into whitespace-separated tokens, keeping `"..."` strings
194/// and `[...]` lists intact.
195fn tokenize(s: &str) -> anyhow::Result<Vec<String>> {
196    let mut tokens = Vec::new();
197    let mut cur = String::new();
198    let mut depth = 0i32;
199    let mut in_quote = false;
200    for c in s.chars() {
201        match c {
202            '"' => {
203                in_quote = !in_quote;
204                cur.push(c);
205            }
206            '[' if !in_quote => {
207                depth += 1;
208                cur.push(c);
209            }
210            ']' if !in_quote => {
211                depth -= 1;
212                cur.push(c);
213            }
214            c if c.is_whitespace() && !in_quote && depth == 0 => {
215                if !cur.is_empty() {
216                    tokens.push(std::mem::take(&mut cur));
217                }
218            }
219            c => cur.push(c),
220        }
221    }
222    ensure!(!in_quote, "unterminated string in `{s}`");
223    ensure!(depth == 0, "unterminated list in `{s}`");
224    if !cur.is_empty() {
225        tokens.push(cur);
226    }
227    Ok(tokens)
228}
229
230/// Parse a header line into its verb, `key=value` arguments, and bare flags.
231fn parse_header(header: &str) -> anyhow::Result<(String, BTreeMap<String, String>, Vec<String>)> {
232    let mut tokens = tokenize(header)?.into_iter();
233    let verb = tokens.next().context("empty command")?;
234    let mut args = BTreeMap::new();
235    let mut flags = Vec::new();
236    for token in tokens {
237        match token.split_once('=') {
238            Some((key, value)) => {
239                args.insert(key.to_string(), value.to_string());
240            }
241            None => flags.push(token),
242        }
243    }
244    Ok((verb, args, flags))
245}
246
247fn req<'a>(args: &'a BTreeMap<String, String>, key: &str) -> anyhow::Result<&'a str> {
248    args.get(key)
249        .map(String::as_str)
250        .ok_or_else(|| anyhow!("missing argument `{key}`"))
251}
252
253fn req_u64(args: &BTreeMap<String, String>, key: &str) -> anyhow::Result<u64> {
254    req(args, key)?
255        .parse()
256        .with_context(|| format!("argument `{key}` is not an integer"))
257}
258
259fn opt_u64(args: &BTreeMap<String, String>, key: &str) -> anyhow::Result<Option<u64>> {
260    args.get(key)
261        .map(|v| {
262            v.parse()
263                .with_context(|| format!("argument `{key}` is not an integer"))
264        })
265        .transpose()
266}
267
268fn opt_usize(args: &BTreeMap<String, String>, key: &str) -> anyhow::Result<Option<usize>> {
269    args.get(key)
270        .map(|v| {
271            v.parse()
272                .with_context(|| format!("argument `{key}` is not an integer"))
273        })
274        .transpose()
275}
276
277fn opt_string(args: &BTreeMap<String, String>, key: &str) -> Option<String> {
278    args.get(key).cloned()
279}
280
281/// Parse a `[a,b,c]` list of `usize`s (`[]` is empty).
282fn parse_usize_list(s: &str) -> anyhow::Result<Vec<usize>> {
283    let inner = s
284        .strip_prefix('[')
285        .and_then(|s| s.strip_suffix(']'))
286        .ok_or_else(|| anyhow!("expected a list like `[0,1]`, got `{s}`"))?;
287    if inner.trim().is_empty() {
288        return Ok(vec![]);
289    }
290    inner
291        .split(',')
292        .map(|part| {
293            part.trim()
294                .parse()
295                .with_context(|| format!("bad list element `{part}`"))
296        })
297        .collect()
298}
299
300/// Parse an `export` sub-command into an [`ExportSpec`]. The `kind=` argument
301/// selects the variant (defaulting to `index`); each kind takes its own arguments.
302fn parse_export(args: &BTreeMap<String, String>) -> anyhow::Result<ExportSpec> {
303    let on_id = req_u64(args, "on")?;
304    Ok(
305        match args.get("kind").map(String::as_str).unwrap_or("index") {
306            "index" => ExportSpec::Index {
307                index_id: req_u64(args, "index")?,
308                on_id,
309                key: parse_usize_list(req(args, "key")?)?,
310            },
311            "materialized-view" => ExportSpec::MaterializedView {
312                sink_id: req_u64(args, "sink")?,
313                on_id,
314                shard: req(args, "shard")?.to_string(),
315                schema: opt_string(args, "schema"),
316            },
317            "subscribe" => ExportSpec::Subscribe {
318                sink_id: req_u64(args, "sink")?,
319                on_id,
320                schema: opt_string(args, "schema"),
321                up_to: opt_u64(args, "up-to")?,
322            },
323            "copy-to" => bail!("copy-to export is not implemented"),
324            other => bail!("unknown export kind `{other}`"),
325        },
326    )
327}
328
329/// Parse body lines as rows of raw space-separated value tokens (quotes intact).
330/// The tokens are typed against the schema server-side; see `cell_from_token`.
331fn rows_from_body(body: &[Line]) -> anyhow::Result<Vec<Vec<String>>> {
332    body.iter().map(|l| tokenize(&l.text)).collect()
333}
334
335/// Parse body lines as dyncfg settings: `name type value`.
336fn settings_from_body(body: &[Line]) -> anyhow::Result<Vec<ConfigSetting>> {
337    body.iter()
338        .map(|l| {
339            let tokens = tokenize(&l.text)?;
340            ensure!(
341                tokens.len() == 3,
342                "config setting needs `name type value`, got `{}`",
343                l.text
344            );
345            Ok(ConfigSetting {
346                name: tokens[0].clone(),
347                ty: tokens[1].clone(),
348                value: tokens[2].clone(),
349            })
350        })
351        .collect()
352}
353
354/// Parse body lines as column declarations: `name type [nullable]`.
355fn columns_from_body(body: &[Line]) -> anyhow::Result<Vec<ColumnSpec>> {
356    body.iter()
357        .map(|l| {
358            let tokens = tokenize(&l.text)?;
359            ensure!(
360                tokens.len() >= 2,
361                "column needs `name type [nullable]`, got `{}`",
362                l.text
363            );
364            Ok(ColumnSpec {
365                name: tokens[0].clone(),
366                ty: tokens[1].clone(),
367                nullable: tokens.get(2).is_some_and(|t| t == "nullable"),
368            })
369        })
370        .collect()
371}
372
373/// Parse a `create-dataflow` body of `import`/`build`/`export` sub-commands. The
374/// directive's bare flags carry the dataflow-level options (`optimize`).
375fn parse_create_dataflow(
376    args: &BTreeMap<String, String>,
377    flags: &[String],
378    body: &[Line],
379) -> anyhow::Result<Command> {
380    let name = opt_string(args, "name");
381    let as_of = req_u64(args, "as-of")?;
382    let optimize = flags.iter().any(|f| f == "optimize");
383    let mut imports = Vec::new();
384    let mut builds = Vec::new();
385    let mut exports = Vec::new();
386    for (header, sub_body) in group(body)? {
387        let (verb, args, _flags) = parse_header(&header.text)?;
388        match verb.as_str() {
389            "import" => {
390                if let Some(index_id) = args.get("index") {
391                    imports.push(ImportSpec::Index {
392                        index_id: index_id
393                            .parse()
394                            .with_context(|| format!("bad index id `{index_id}`"))?,
395                    });
396                } else {
397                    imports.push(ImportSpec::Source {
398                        id: req_u64(&args, "source")?,
399                        shard: req(&args, "shard")?.to_string(),
400                        schema: opt_string(&args, "schema"),
401                        upper: req_u64(&args, "upper")?,
402                    });
403                }
404            }
405            "build" => {
406                ensure!(!sub_body.is_empty(), "`build` needs a MIR body");
407                builds.push(BuildSpec {
408                    id: req_u64(&args, "id")?,
409                    expr: body_text(sub_body),
410                });
411            }
412            "export" => exports.push(parse_export(&args)?),
413            other => bail!("unknown `create-dataflow` sub-command `{other}`"),
414        }
415    }
416    Ok(Command::CreateDataflow {
417        name,
418        imports,
419        builds,
420        exports,
421        as_of,
422        optimize,
423    })
424}
425
426/// Parse one command block (directive line plus indentation-structured body).
427fn parse_command(input: &str) -> anyhow::Result<Command> {
428    let lines = lex(input);
429    let (header, body) = lines.split_first().context("empty command")?;
430    ensure!(header.indent == 0, "directive must not be indented");
431    let (verb, args, flags) = parse_header(&header.text)?;
432    let command = match verb.as_str() {
433        "define-schema" => Command::DefineSchema {
434            name: req(&args, "name")?.to_string(),
435            columns: columns_from_body(body)?,
436        },
437        "write-single-ts" => Command::WriteSingleTs {
438            shard: req(&args, "shard")?.to_string(),
439            schema: opt_string(&args, "schema"),
440            ts: req_u64(&args, "ts")?,
441            count: req_u64(&args, "count")?,
442            start: opt_u64(&args, "start")?.unwrap_or(0),
443            row_bytes: opt_usize(&args, "row-bytes")?,
444        },
445        "write-spread" => Command::WriteSpread {
446            shard: req(&args, "shard")?.to_string(),
447            schema: opt_string(&args, "schema"),
448            count: req_u64(&args, "count")?,
449            n_ts: req_u64(&args, "n-ts")?,
450            start: opt_u64(&args, "start")?.unwrap_or(0),
451            row_bytes: opt_usize(&args, "row-bytes")?,
452        },
453        "write-rows" => Command::WriteRows {
454            shard: req(&args, "shard")?.to_string(),
455            schema: opt_string(&args, "schema"),
456            ts: req_u64(&args, "ts")?,
457            rows: rows_from_body(body)?,
458        },
459        "define-index" => Command::DefineIndex {
460            source_id: req_u64(&args, "source")?,
461            index_id: req_u64(&args, "index")?,
462            shard: req(&args, "shard")?.to_string(),
463            schema: opt_string(&args, "schema"),
464            key: parse_usize_list(req(&args, "key")?)?,
465            as_of: req_u64(&args, "as-of")?,
466            upper: req_u64(&args, "upper")?,
467        },
468        "schedule" => Command::Schedule {
469            id: req_u64(&args, "id")?,
470        },
471        "allow-compaction" => Command::AllowCompaction {
472            id: req_u64(&args, "id")?,
473            frontier: req_u64(&args, "frontier")?,
474        },
475        "allow-writes" => Command::AllowWrites {
476            id: req_u64(&args, "id")?,
477        },
478        "await-frontier" => Command::AwaitFrontier {
479            id: req_u64(&args, "id")?,
480            ts: req_u64(&args, "ts")?,
481            timeout_secs: opt_u64(&args, "timeout-secs")?,
482            allow_timeout: flags.iter().any(|f| f == "allow-timeout"),
483        },
484        "count" => Command::Count {
485            id: req_u64(&args, "id")?,
486            ts: req_u64(&args, "ts")?,
487        },
488        "peek" => Command::Peek {
489            id: req_u64(&args, "id")?,
490            schema: opt_string(&args, "schema"),
491            ts: req_u64(&args, "ts")?,
492        },
493        "await-subscribe" => Command::AwaitSubscribe {
494            id: req_u64(&args, "id")?,
495            up_to: req_u64(&args, "up-to")?,
496            timeout_secs: opt_u64(&args, "timeout-secs")?,
497        },
498        "create-dataflow" => parse_create_dataflow(&args, &flags, body)?,
499        "create-instance" => Command::CreateInstance {
500            expiration_offset: opt_string(&args, "expiration-offset"),
501            arrangement_dictionary_compression: args
502                .get("arrangement-dictionary-compression")
503                .map(|v| v.parse())
504                .transpose()
505                .context("argument `arrangement-dictionary-compression` is not a bool")?
506                .unwrap_or(false),
507        },
508        "update-configuration" => Command::UpdateConfiguration {
509            updates: settings_from_body(body)?,
510        },
511        "reconnect" => Command::Reconnect,
512        "initialization-complete" => Command::InitializationComplete,
513        other => bail!("unknown command `{other}`"),
514    };
515    Ok(command)
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521
522    /// A simple command parses its args; absent optionals default.
523    #[mz_ore::test]
524    fn parses_simple_command() {
525        let cmd = parse_command("write-single-ts shard=data ts=0 count=5000").unwrap();
526        assert_eq!(
527            cmd,
528            Command::WriteSingleTs {
529                shard: "data".to_string(),
530                schema: None,
531                ts: 0,
532                count: 5000,
533                start: 0,
534                row_bytes: None,
535            }
536        );
537    }
538
539    /// A flag and a list argument parse.
540    #[mz_ore::test]
541    fn parses_flag_and_list() {
542        let cmd =
543            parse_command("await-frontier id=1001 ts=1 timeout-secs=3 allow-timeout").unwrap();
544        assert_eq!(
545            cmd,
546            Command::AwaitFrontier {
547                id: 1001,
548                ts: 1,
549                timeout_secs: Some(3),
550                allow_timeout: true,
551            }
552        );
553        let cmd =
554            parse_command("define-index source=1000 index=1001 shard=d key=[0] as-of=0 upper=1")
555                .unwrap();
556        assert_eq!(
557            cmd,
558            Command::DefineIndex {
559                source_id: 1000,
560                index_id: 1001,
561                shard: "d".to_string(),
562                schema: None,
563                key: vec![0],
564                as_of: 0,
565                upper: 1,
566            }
567        );
568    }
569
570    /// `define-schema` and `write-rows` parse their indented bodies, typing values.
571    #[mz_ore::test]
572    fn parses_bodies() {
573        let cmd = parse_command(
574            "define-schema name=events\n  key bigint\n  flag boolean\n  label text nullable",
575        )
576        .unwrap();
577        assert_eq!(
578            cmd,
579            Command::DefineSchema {
580                name: "events".to_string(),
581                columns: vec![
582                    ColumnSpec {
583                        name: "key".to_string(),
584                        ty: "bigint".to_string(),
585                        nullable: false
586                    },
587                    ColumnSpec {
588                        name: "flag".to_string(),
589                        ty: "boolean".to_string(),
590                        nullable: false
591                    },
592                    ColumnSpec {
593                        name: "label".to_string(),
594                        ty: "text".to_string(),
595                        nullable: true
596                    },
597                ],
598            }
599        );
600
601        let cmd = parse_command(
602            "write-rows shard=ev schema=events ts=1\n  1000 true alpha\n  1001 false null",
603        )
604        .unwrap();
605        assert_eq!(
606            cmd,
607            Command::WriteRows {
608                shard: "ev".to_string(),
609                schema: Some("events".to_string()),
610                ts: 1,
611                rows: vec![
612                    vec!["1000".to_string(), "true".to_string(), "alpha".to_string()],
613                    vec!["1001".to_string(), "false".to_string(), "null".to_string()],
614                ],
615            }
616        );
617    }
618
619    /// `create-dataflow` parses its sub-commands, keeping a `build`'s MIR (its
620    /// deeper body) with relative indentation preserved; the export kind defaults
621    /// to `index`.
622    #[mz_ore::test]
623    fn parses_create_dataflow_with_mir() {
624        let input = "create-dataflow name=count as-of=0\n  import index=1001\n  build id=2000\n    Reduce aggregates=[count(*)]\n      Get u1000\n  export index=2001 on=2000 key=[0]";
625        let cmd = parse_command(input).unwrap();
626        assert_eq!(
627            cmd,
628            Command::CreateDataflow {
629                name: Some("count".to_string()),
630                imports: vec![ImportSpec::Index { index_id: 1001 }],
631                builds: vec![BuildSpec {
632                    id: 2000,
633                    expr: "Reduce aggregates=[count(*)]\n  Get u1000".to_string(),
634                }],
635                exports: vec![ExportSpec::Index {
636                    index_id: 2001,
637                    on_id: 2000,
638                    key: vec![0]
639                }],
640                as_of: 0,
641                optimize: false,
642            }
643        );
644
645        // The `optimize` flag on the directive line is picked up.
646        let optimized = parse_command(
647            "create-dataflow name=j as-of=0 optimize\n  import source=1000 shard=l upper=1\n  build id=2000\n    Get u1000\n  export index=2001 on=2000 key=[0]",
648        )
649        .unwrap();
650        assert!(matches!(
651            optimized,
652            Command::CreateDataflow { optimize: true, .. }
653        ));
654    }
655
656    /// `create-dataflow` parses the sink export kinds: a materialized-view sink with
657    /// a target shard, and a subscribe sink. The `copy-to` kind is rejected.
658    #[mz_ore::test]
659    fn parses_sink_export_kinds() {
660        let mv = "create-dataflow name=mv as-of=0\n  import source=1000 shard=r upper=1\n  build id=2000\n    Get u1000\n  export kind=materialized-view sink=2001 on=2000 shard=out schema=kv";
661        let Command::CreateDataflow { exports, .. } = parse_command(mv).unwrap() else {
662            panic!("expected create-dataflow");
663        };
664        assert_eq!(
665            exports,
666            vec![ExportSpec::MaterializedView {
667                sink_id: 2001,
668                on_id: 2000,
669                shard: "out".to_string(),
670                schema: Some("kv".to_string()),
671            }]
672        );
673
674        let sub = "create-dataflow name=sub as-of=0\n  import source=1000 shard=s upper=2\n  build id=2000\n    Get u1000\n  export kind=subscribe sink=2001 on=2000 up-to=2";
675        let Command::CreateDataflow { exports, .. } = parse_command(sub).unwrap() else {
676            panic!("expected create-dataflow");
677        };
678        assert_eq!(
679            exports,
680            vec![ExportSpec::Subscribe {
681                sink_id: 2001,
682                on_id: 2000,
683                schema: None,
684                up_to: Some(2),
685            }]
686        );
687
688        // copy-to is named but not implemented.
689        let copy = "create-dataflow name=c as-of=0\n  import source=1000 shard=s upper=1\n  build id=2000\n    Get u1000\n  export kind=copy-to sink=2001 on=2000";
690        assert!(parse_command(copy).is_err());
691    }
692
693    /// `create-instance` parses its optional knobs (defaulting), and
694    /// `update-configuration` parses a `name type value` table (empty when bodyless).
695    #[mz_ore::test]
696    fn parses_handshake_config() {
697        assert_eq!(
698            parse_command("create-instance").unwrap(),
699            Command::CreateInstance {
700                expiration_offset: None,
701                arrangement_dictionary_compression: false,
702            }
703        );
704        assert_eq!(
705            parse_command(
706                "create-instance expiration-offset=30s arrangement-dictionary-compression=true"
707            )
708            .unwrap(),
709            Command::CreateInstance {
710                expiration_offset: Some("30s".to_string()),
711                arrangement_dictionary_compression: true,
712            }
713        );
714
715        assert_eq!(
716            parse_command("update-configuration").unwrap(),
717            Command::UpdateConfiguration { updates: vec![] }
718        );
719        assert_eq!(
720            parse_command("update-configuration\n  enable_my_flag bool true\n  my_dur duration 1s")
721                .unwrap(),
722            Command::UpdateConfiguration {
723                updates: vec![
724                    ConfigSetting {
725                        name: "enable_my_flag".to_string(),
726                        ty: "bool".to_string(),
727                        value: "true".to_string(),
728                    },
729                    ConfigSetting {
730                        name: "my_dur".to_string(),
731                        ty: "duration".to_string(),
732                        value: "1s".to_string(),
733                    },
734                ],
735            }
736        );
737    }
738
739    /// `await-subscribe` parses its arguments.
740    #[mz_ore::test]
741    fn parses_await_subscribe() {
742        let cmd = parse_command("await-subscribe id=2001 up-to=2 timeout-secs=5").unwrap();
743        assert_eq!(
744            cmd,
745            Command::AwaitSubscribe {
746                id: 2001,
747                up_to: 2,
748                timeout_secs: Some(5),
749            }
750        );
751    }
752
753    /// A file splits into stanzas, preserving comments and blanks, and round-trips
754    /// through a rewrite when the actual output equals the expected.
755    #[mz_ore::test]
756    fn parses_and_rewrites_file() {
757        let content =
758            "# a comment\nschedule id=1001\n----\nok\n\ncount id=1001 ts=5\n----\n10000\n";
759        let items = parse_file(content).unwrap();
760        let stanzas: Vec<_> = items
761            .iter()
762            .filter_map(|i| match i {
763                Item::Stanza(s) => Some(s),
764                Item::Verbatim(_) => None,
765            })
766            .collect();
767        assert_eq!(stanzas.len(), 2);
768        assert_eq!(stanzas[0].command, Command::Schedule { id: 1001 });
769        assert_eq!(stanzas[0].expected, "ok");
770        assert_eq!(stanzas[1].expected, "10000");
771
772        // Rewriting with the same outputs reproduces the file.
773        let actuals = vec!["ok".to_string(), "10000".to_string()];
774        assert_eq!(rewrite(&items, &actuals), content);
775    }
776}