Skip to main content

mz_clusterd_test_driver/
script.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Executes a text command script against `clusterd`.
11//!
12//! Instead of recompiling a Rust scenario, a test (or an agent) writes a
13//! [`crate::text`] script: a sequence of commands, each with an expected output
14//! block (`----`) that is the assertion. The coarse orchestration verbs map almost
15//! directly to [`Driver`] calls; `define` carries arbitrary MIR (as pretty-form
16//! specs parsed by `mz-expr-parser`, the `.spec` test syntax) over the full
17//! [`DataflowBuilder`] surface, including index imports, while `define_index`
18//! stays as sugar for the common single-index shape. Explicit `write_rows`
19//! payloads are typed against the schema token-by-token via `cell_from_token`
20//! (reusing `mz_repr::strconv`) rather than `Row`'s opaque serde.
21//!
22//! # Execution
23//!
24//! [`run`] parses the script, executes each command, and compares its golden
25//! output to the expected block — failing the run on a mismatch, or rewriting the
26//! file when `REWRITE` is set. A command that fails renders as `error: <message>`,
27//! so an expected failure is asserted by its golden block. Assertions are
28//! level-triggered waits on monotonic frontiers, so a single sequential script is
29//! deterministic regardless of how the dataflows interleave.
30//!
31//! Shards are referenced by a string alias; the first command naming an alias
32//! allocates a fresh [`ShardId`] for it. Object ids are raw `u64`s mapped to
33//! [`GlobalId::User`].
34
35use std::collections::BTreeMap;
36use std::path::Path;
37use std::time::Duration;
38
39use anyhow::Context;
40use mz_compute_client::protocol::command::{ComputeCommand, PeekTarget};
41use mz_dyncfg::{ConfigType, ConfigUpdates, ConfigVal};
42use mz_expr::visit::Visit;
43use mz_expr::{Id, MirRelationExpr};
44use mz_expr_parser::{TestCatalog, try_parse_mir};
45use mz_persist_client::PersistClient;
46use mz_persist_types::{PersistLocation, ShardId};
47use mz_repr::{
48    GlobalId, RelationDesc, ReprRelationType, Row, SqlColumnType, SqlRelationType, SqlScalarType,
49    Timestamp, strconv,
50};
51use mz_storage_types::controller::CollectionMetadata;
52use serde::{Deserialize, Serialize};
53use timely::progress::Antichain;
54
55use crate::data::{
56    Cell, pack_cells, sample_desc, synth_rows, write_rows_single_ts, write_rows_spread,
57};
58use crate::dataflow::{
59    DataflowBuilder, PersistSink, PersistSource, count_over_index, index_dataflow,
60};
61use crate::driver::Driver;
62
63/// The default payload padding (bytes) for synthetic rows when a command omits it.
64const DEFAULT_ROW_BYTES: usize = 64;
65/// The default timeout (seconds) for `await_frontier` when a command omits it.
66const DEFAULT_TIMEOUT_SECS: u64 = 600;
67
68/// A column declaration in a `define_schema` command.
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
70pub struct ColumnSpec {
71    /// Column name.
72    pub name: String,
73    /// Scalar type name; see `scalar_type_from_str`.
74    #[serde(rename = "type")]
75    pub ty: String,
76    /// Whether the column admits `NULL`.
77    #[serde(default)]
78    pub nullable: bool,
79}
80
81/// A single dyncfg update in an `update-configuration` command: a config name, a
82/// type tag selecting how `value` is parsed (`bool`/`u32`/`usize`/`f64`/`string`/
83/// `duration`), and the value. Typed against [`mz_dyncfg`] at execution.
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85pub struct ConfigSetting {
86    /// The dyncfg name (sent to the replica by name; unknown names are ignored).
87    pub name: String,
88    /// The type tag selecting the [`ConfigVal`] variant.
89    #[serde(rename = "type")]
90    pub ty: String,
91    /// The value, parsed against `ty`.
92    pub value: String,
93}
94
95/// A collection to import in a `define` command: a persist source or an existing
96/// index. Externally tagged: `{"source": {…}}` or `{"index": {…}}`.
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99pub enum ImportSpec {
100    /// Import a persist-backed storage collection, as `define_index` does.
101    Source {
102        /// The imported source's global id.
103        id: u64,
104        /// Shard alias to import; allocated on first use.
105        shard: String,
106        /// Schema name; defaults to the built-in sample schema.
107        #[serde(default)]
108        schema: Option<String>,
109        /// The shard's exclusive write upper (see `PersistSource::upper`).
110        upper: u64,
111    },
112    /// Import an existing index by its global id; its arranged collection, key,
113    /// and type are taken from the registry, so it must have been defined first.
114    Index {
115        /// The index's global id.
116        index_id: u64,
117    },
118}
119
120/// A MIR object to build in a `define` command.
121#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
122pub struct BuildSpec {
123    /// The built object's global id.
124    pub id: u64,
125    /// The computation, as a pretty-form MIR spec parsed by `mz-expr-parser`
126    /// (e.g. `Reduce aggregates=[count(*)]` over `Get u1000`). It references
127    /// imported or previously-built objects by their global-id name (`u<n>`); the
128    /// leaf `Get`'s type is resolved from the import, not authored.
129    pub expr: String,
130}
131
132/// An export in a `create-dataflow` command, mirroring the export kinds a real
133/// dataflow produces (see [`mz_compute_types::sinks::ComputeSinkConnection`]).
134/// `copy-to` is intentionally absent: the parser rejects it as unimplemented.
135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
136#[serde(rename_all = "kebab-case")]
137pub enum ExportSpec {
138    /// An arrangement, peekable as an index and importable by later dataflows.
139    Index {
140        /// The exported index's global id.
141        index_id: u64,
142        /// The imported or built id the index arranges.
143        on_id: u64,
144        /// Columns to arrange by.
145        key: Vec<usize>,
146    },
147    /// A persist sink writing the collection to a shard (a materialized view),
148    /// verified by reading the shard back with a persist `peek` of the sink id.
149    MaterializedView {
150        /// The sink's global id (scheduled and frontier-tracked under this id).
151        sink_id: u64,
152        /// The imported or built id the sink writes.
153        on_id: u64,
154        /// Target shard alias; allocated on first use.
155        shard: String,
156        /// Output schema; defaults to the sample schema. Must match `on_id`'s type.
157        schema: Option<String>,
158    },
159    /// A subscribe sink streaming changes back as responses, collected by
160    /// `await-subscribe`.
161    Subscribe {
162        /// The sink's global id.
163        sink_id: u64,
164        /// The imported or built id the sink streams.
165        on_id: u64,
166        /// Output schema; defaults to the sample schema. Must match `on_id`'s type.
167        schema: Option<String>,
168        /// Exclusive upper at which the subscribe completes; unbounded if absent.
169        up_to: Option<u64>,
170    },
171}
172
173/// Map a JSON type name to a [`SqlScalarType`]. The supported set is intentionally
174/// small and matches [`crate::data::Cell`]; extend both together.
175fn scalar_type_from_str(s: &str) -> anyhow::Result<SqlScalarType> {
176    Ok(match s.to_ascii_lowercase().as_str() {
177        "int16" | "smallint" => SqlScalarType::Int16,
178        "int32" | "int" | "integer" => SqlScalarType::Int32,
179        "int64" | "bigint" => SqlScalarType::Int64,
180        "bool" | "boolean" => SqlScalarType::Bool,
181        "string" | "text" => SqlScalarType::String,
182        "bytes" | "bytea" => SqlScalarType::Bytes,
183        other => anyhow::bail!("unsupported column type {other:?}"),
184    })
185}
186
187/// Parse a configuration value string into a [`ConfigVal`], selecting the variant
188/// by a type tag. Each numeric/bool/duration type reuses [`mz_dyncfg`]'s own
189/// `ConfigType::parse` (so the accepted syntax — e.g. `on`/`off` for bool, humantime
190/// for duration — matches the rest of the codebase); `string` is taken verbatim.
191fn parse_config_val(ty: &str, value: &str) -> anyhow::Result<ConfigVal> {
192    let err = |e: String| anyhow::anyhow!("config value {value:?} is not a valid {ty}: {e}");
193    Ok(match ty {
194        "bool" => <bool as ConfigType>::parse(value).map_err(err)?.into(),
195        "u32" => <u32 as ConfigType>::parse(value).map_err(err)?.into(),
196        "usize" => <usize as ConfigType>::parse(value).map_err(err)?.into(),
197        "f64" => <f64 as ConfigType>::parse(value).map_err(err)?.into(),
198        "duration" => <Duration as ConfigType>::parse(value).map_err(err)?.into(),
199        "string" => ConfigVal::String(value.to_string()),
200        other => anyhow::bail!(
201            "unsupported config type {other:?}; use bool/u32/usize/f64/duration/string"
202        ),
203    })
204}
205
206/// Build a [`RelationDesc`] from column specs.
207fn relation_desc(columns: &[ColumnSpec]) -> anyhow::Result<RelationDesc> {
208    let mut builder = RelationDesc::builder();
209    for col in columns {
210        builder = builder.with_column(
211            col.name.as_str(),
212            SqlColumnType {
213                scalar_type: scalar_type_from_str(&col.ty)?,
214                nullable: col.nullable,
215            },
216        );
217    }
218    Ok(builder.finish())
219}
220
221/// Strip surrounding double quotes from a token, if present.
222fn unquote(s: &str) -> &str {
223    s.strip_prefix('"')
224        .and_then(|s| s.strip_suffix('"'))
225        .unwrap_or(s)
226}
227
228/// Type a raw row-value token against its column into an owned [`Cell`].
229///
230/// The bare token `null` is SQL `NULL` (only in a nullable column); quote it
231/// (`"null"`) for the literal string. Numeric and boolean tokens go through
232/// [`mz_repr::strconv`] — the canonical PostgreSQL-compatible text parser the rest
233/// of the codebase uses — so the accepted syntax matches `mz_pgrepr`'s text decode.
234/// `string`/`bytes` columns take the (unquoted) token verbatim; `bytes` is its
235/// UTF-8 encoding.
236fn cell_from_token(token: &str, col: &SqlColumnType) -> anyhow::Result<Cell> {
237    if token == "null" {
238        anyhow::ensure!(col.nullable, "null value in non-nullable column");
239        return Ok(Cell::Null);
240    }
241    let parse =
242        |kind: &str, e: strconv::ParseError| anyhow::anyhow!("parsing {token:?} as {kind}: {e}");
243    let cell = match col.scalar_type {
244        SqlScalarType::Int16 => {
245            Cell::Int16(strconv::parse_int16(token).map_err(|e| parse("int16", e))?)
246        }
247        SqlScalarType::Int32 => {
248            Cell::Int32(strconv::parse_int32(token).map_err(|e| parse("int32", e))?)
249        }
250        SqlScalarType::Int64 => {
251            Cell::Int64(strconv::parse_int64(token).map_err(|e| parse("int64", e))?)
252        }
253        SqlScalarType::Bool => {
254            Cell::Bool(strconv::parse_bool(token).map_err(|e| parse("bool", e))?)
255        }
256        SqlScalarType::String => Cell::Str(unquote(token).to_string()),
257        SqlScalarType::Bytes => Cell::Bytes(unquote(token).as_bytes().to_vec()),
258        ref other => anyhow::bail!("unsupported column type {other:?}"),
259    };
260    Ok(cell)
261}
262
263/// Pack explicit row tokens against `desc`, validating arity per row.
264fn rows_from_tokens(desc: &RelationDesc, rows: &[Vec<String>]) -> anyhow::Result<Vec<Row>> {
265    let cols: Vec<&SqlColumnType> = desc.iter_types().collect();
266    let mut out = Vec::with_capacity(rows.len());
267    for (r, row) in rows.iter().enumerate() {
268        anyhow::ensure!(
269            row.len() == cols.len(),
270            "row {r} has {} values but schema has {} columns",
271            row.len(),
272            cols.len()
273        );
274        // Arity validated above, so indexing `cols` by position is in bounds.
275        let cells = row
276            .iter()
277            .enumerate()
278            .map(|(c, v)| cell_from_token(v, cols[c]))
279            .collect::<anyhow::Result<Vec<Cell>>>()?;
280        out.push(pack_cells(&cells));
281    }
282    Ok(out)
283}
284
285/// Register a referenceable object in the parser `catalog` under its global-id
286/// name (e.g. `u1000`), recording the name-to-id mapping so the parsed `Get`s —
287/// which carry the catalog's own assigned ids — can be remapped back to the
288/// script's ids.
289fn register_catalog_object(
290    catalog: &mut TestCatalog,
291    name_to_id: &mut BTreeMap<String, GlobalId>,
292    id: GlobalId,
293    sql_typ: SqlRelationType,
294) -> anyhow::Result<()> {
295    let name = id.to_string();
296    // Column names are only used for display; `Get` references columns by `#n`,
297    // so synthetic `c0..cN` names suffice.
298    let cols = (0..sql_typ.column_types.len())
299        .map(|i| format!("c{i}"))
300        .collect();
301    catalog
302        .insert(&name, cols, sql_typ, false)
303        .map_err(|e| anyhow::anyhow!("registering {name} in catalog: {e}"))?;
304    name_to_id.insert(name, id);
305    Ok(())
306}
307
308/// Rewrite every global `Get` in `expr` from the catalog's assigned id back to
309/// the script's id, looked up by the object's name.
310fn remap_gets(
311    expr: &mut MirRelationExpr,
312    catalog: &TestCatalog,
313    name_to_id: &BTreeMap<String, GlobalId>,
314) -> anyhow::Result<()> {
315    expr.try_visit_mut_post::<_, anyhow::Error>(&mut |e| {
316        if let MirRelationExpr::Get {
317            id: Id::Global(g), ..
318        } = e
319        {
320            let name = catalog
321                .get_source_name(g)
322                .ok_or_else(|| anyhow::anyhow!("get of unknown catalog object {g}"))?;
323            let id = name_to_id
324                .get(name)
325                .ok_or_else(|| anyhow::anyhow!("get of unregistered object {name}"))?;
326            *g = *id;
327        }
328        Ok(())
329    })
330}
331
332/// A command read from the script stream.
333///
334/// Tagged on `"cmd"`, snake_case, e.g.
335/// `{"cmd":"write_single_ts","shard":"s1","ts":0,"rows":1000}`.
336#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
337#[serde(tag = "cmd", rename_all = "snake_case")]
338pub enum Command {
339    /// Declare a named relation schema for later `schema` references.
340    DefineSchema {
341        /// Schema name, referenced by `schema` fields on other commands.
342        name: String,
343        /// Ordered column declarations.
344        columns: Vec<ColumnSpec>,
345    },
346    /// Write `count` synthetic rows to `shard` at a single timestamp `ts`.
347    WriteSingleTs {
348        /// Shard alias; allocated on first use.
349        shard: String,
350        /// Schema name; defaults to the built-in `(bigint, text)` sample schema.
351        #[serde(default)]
352        schema: Option<String>,
353        /// The timestamp to write at.
354        ts: u64,
355        /// Number of synthetic rows to write.
356        count: u64,
357        /// First synthetic row index, so successive batches can use disjoint id
358        /// ranges (`start..start + count`) that never consolidate. Defaults to 0.
359        #[serde(default)]
360        start: u64,
361        /// Payload padding per row; defaults to `DEFAULT_ROW_BYTES`.
362        #[serde(default)]
363        row_bytes: Option<usize>,
364    },
365    /// Write `count` synthetic rows to `shard`, spread across `n_ts` timestamps in a
366    /// single append.
367    WriteSpread {
368        /// Shard alias; allocated on first use.
369        shard: String,
370        /// Schema name; defaults to the built-in sample schema.
371        #[serde(default)]
372        schema: Option<String>,
373        /// Number of synthetic rows to write.
374        count: u64,
375        /// Number of distinct timestamps to spread the rows across.
376        n_ts: u64,
377        /// First synthetic row index (see [`Command::WriteSingleTs`]). Defaults to 0.
378        #[serde(default)]
379        start: u64,
380        /// Payload padding per row; defaults to `DEFAULT_ROW_BYTES`.
381        #[serde(default)]
382        row_bytes: Option<usize>,
383    },
384    /// Write explicit rows to `shard` at a single timestamp `ts`. Each row is an
385    /// array of JSON values matching the schema's columns in order.
386    WriteRows {
387        /// Shard alias; allocated on first use.
388        shard: String,
389        /// Schema name; defaults to the built-in sample schema.
390        #[serde(default)]
391        schema: Option<String>,
392        /// The timestamp to write at.
393        ts: u64,
394        /// Rows as arrays of raw value tokens, typed against the schema by
395        /// `cell_from_token`.
396        rows: Vec<Vec<String>>,
397    },
398    /// Submit (without scheduling) an index dataflow over `shard`.
399    DefineIndex {
400        /// The imported source's global id.
401        source_id: u64,
402        /// The exported index's global id.
403        index_id: u64,
404        /// Shard alias to import; must already exist.
405        shard: String,
406        /// Schema name; defaults to the built-in sample schema. Must match what was
407        /// written to `shard`.
408        #[serde(default)]
409        schema: Option<String>,
410        /// Columns to arrange by.
411        key: Vec<usize>,
412        /// The dataflow's `as_of`.
413        as_of: u64,
414        /// The shard's exclusive write upper (see `PersistSource::upper`).
415        upper: u64,
416    },
417    /// Schedule a previously-submitted collection so it makes progress.
418    Schedule {
419        /// The collection's global id.
420        id: u64,
421    },
422    /// Advance an index's read frontier (`since`) via `AllowCompaction`.
423    AllowCompaction {
424        /// The index's global id.
425        id: u64,
426        /// The new read frontier.
427        frontier: u64,
428    },
429    /// Take a collection out of read-only mode via `AllowWrites`, letting its
430    /// persist sink begin writing. Every dataflow starts read-only; indexes,
431    /// subscribes, and peeks work regardless, but a materialized-view sink withholds
432    /// all persist writes until this is sent for its sink id.
433    AllowWrites {
434        /// The sink's global id.
435        id: u64,
436    },
437    /// Wait until `id`'s output frontier reaches `ts`, or fail after the timeout.
438    AwaitFrontier {
439        /// The collection's global id.
440        id: u64,
441        /// The target output-frontier timestamp.
442        ts: u64,
443        /// Timeout in seconds; defaults to `DEFAULT_TIMEOUT_SECS`.
444        #[serde(default)]
445        timeout_secs: Option<u64>,
446        /// If true, a timeout is reported (`status: timeout`) without failing the
447        /// run. Used by reproductions where not reaching the frontier is an
448        /// expected outcome, not an assertion failure.
449        #[serde(default)]
450        allow_timeout: bool,
451    },
452    /// Count `id`'s rows at `ts` and emit the count.
453    ///
454    /// Sugar over a `Reduce`: builds an ephemeral dataflow that index-imports `id`,
455    /// computes `count(*)` over it, and peeks the single-row result — so the count
456    /// runs through a real reduce operator rather than being tallied in the driver.
457    /// `id` must have been registered by a prior `define_index` (or `define`
458    /// export). The golden output is the count; the script's `----` block asserts it.
459    Count {
460        /// The index's global id.
461        id: u64,
462        /// The timestamp to count at.
463        ts: u64,
464    },
465    /// Submit (without scheduling) a dataflow built from generic MIR — the
466    /// abstraction behind index / materialized-view / subscribe / copy-to.
467    ///
468    /// A projection of [`DataflowBuilder`]: import sources and/or existing indexes,
469    /// build MIR objects (each a pretty-form MIR spec; see [`BuildSpec`]), and
470    /// export over them. Exports are index, materialized-view, or subscribe (see
471    /// [`ExportSpec`]); copy-to is not implemented. Exported indexes are registered
472    /// for later import or count assertion; subscribe sinks register a response
473    /// buffer for `await-subscribe`. `define-index` is sugar over this. The optional
474    /// `optimize` flag runs the MIR optimizer before lowering (needed for joins).
475    CreateDataflow {
476        /// Debug name for the dataflow; defaults to `headless-create-dataflow`.
477        #[serde(default)]
478        name: Option<String>,
479        /// Collections to import (persist sources and/or existing indexes).
480        #[serde(default)]
481        imports: Vec<ImportSpec>,
482        /// MIR objects to compute, each bound to an id.
483        #[serde(default)]
484        builds: Vec<BuildSpec>,
485        /// Exports over imported or built ids.
486        #[serde(default)]
487        exports: Vec<ExportSpec>,
488        /// The dataflow's `as_of`.
489        as_of: u64,
490        /// Run the MIR optimizer before lowering (needed for e.g. joins). Off by
491        /// default, so the caller's MIR is lowered faithfully.
492        #[serde(default)]
493        optimize: bool,
494    },
495    /// Peek `id` at `ts` and emit the returned rows (sorted, one per line). The
496    /// generic output assertion: the script's `----` block holds the expected rows.
497    Peek {
498        /// The index's global id.
499        id: u64,
500        /// Schema name describing the peek's output; defaults to the sample schema.
501        #[serde(default)]
502        schema: Option<String>,
503        /// The timestamp to peek at.
504        ts: u64,
505    },
506    /// Wait for subscribe sink `id`'s upper to reach `up_to`, then emit its
507    /// accumulated updates as `<ts> <diff> <datums>` lines (consolidated, sorted).
508    /// The output assertion for a subscribe sink.
509    AwaitSubscribe {
510        /// The subscribe sink's global id.
511        id: u64,
512        /// The exclusive upper to wait for (typically the sink's `up_to`).
513        up_to: u64,
514        /// Timeout in seconds; defaults to `DEFAULT_TIMEOUT_SECS`.
515        timeout_secs: Option<u64>,
516    },
517    /// Send `CreateInstance`, opening the compute instance (and the reconciliation
518    /// window). The settable [`InstanceConfig`] knobs default to the values a plain
519    /// `create-instance` supplies; the peek-stash location is always the host's, and
520    /// the peek-response stash is force-disabled (see `Driver::create_instance`).
521    ///
522    /// [`InstanceConfig`]: mz_compute_client::protocol::command::InstanceConfig
523    CreateInstance {
524        /// Replica expiration offset (a duration like `30s`); none if absent.
525        #[serde(default)]
526        expiration_offset: Option<String>,
527        /// Whether arrangements use dictionary compression.
528        #[serde(default)]
529        arrangement_dictionary_compression: bool,
530    },
531    /// Send `UpdateConfiguration` with a table of dyncfg updates (`name type value`
532    /// rows). Generic over any configuration; the peek-response stash is not settable
533    /// here (it is force-disabled at instance creation).
534    UpdateConfiguration {
535        /// The dyncfg updates to apply.
536        #[serde(default)]
537        updates: Vec<ConfigSetting>,
538    },
539    /// Drop the current connection and reconnect, sending only `Hello`. Re-issue
540    /// `create_instance`, replay the dataflows the replica should keep, then
541    /// `initialization_complete` to close the reconciliation window.
542    Reconnect,
543    /// Send `InitializationComplete`, closing the reconciliation window.
544    InitializationComplete,
545}
546
547/// What the registry remembers about an exported index, so a later `define`
548/// import or count assertion can reconstruct the import without re-declaring it.
549struct IndexEntry {
550    /// The id of the collection the index arranges.
551    on_id: u64,
552    /// The columns the index is arranged by.
553    key: Vec<usize>,
554    /// The arranged collection's relation type (for `import_index`).
555    on_type: ReprRelationType,
556}
557
558/// The base for ephemeral global ids the count sugar allocates. Far above any
559/// id a script would use, so its dataflows never collide with user objects.
560const INTERNAL_ID_BASE: u64 = u64::MAX / 2;
561
562/// Mutable state threaded through a script run.
563pub struct ScriptState {
564    driver: Driver,
565    client: PersistClient,
566    loc: PersistLocation,
567    /// Named schemas declared via `define_schema`.
568    schemas: BTreeMap<String, RelationDesc>,
569    /// Alias-to-shard map; aliases are allocated lazily on first use.
570    shards: BTreeMap<String, ShardId>,
571    /// Exported indexes, by global id, for later import / count assertions.
572    indexes: BTreeMap<u64, IndexEntry>,
573    /// Materialized-view sink outputs, by sink global id: the target shard's
574    /// metadata, so a `peek` of the sink id reads its shard via a persist peek.
575    mv_outputs: BTreeMap<u64, CollectionMetadata>,
576    /// Next ephemeral id for the count sugar's dataflows.
577    next_internal: u64,
578}
579
580impl ScriptState {
581    /// Build the state from a connected driver and its persist location, opening a
582    /// persist client.
583    pub async fn new(driver: Driver, loc: PersistLocation) -> anyhow::Result<Self> {
584        let client = driver.host.client().await?;
585        Ok(ScriptState {
586            driver,
587            client,
588            loc,
589            schemas: BTreeMap::new(),
590            shards: BTreeMap::new(),
591            indexes: BTreeMap::new(),
592            mv_outputs: BTreeMap::new(),
593            next_internal: INTERNAL_ID_BASE,
594        })
595    }
596
597    /// Resolve a shard alias, allocating a fresh [`ShardId`] on first use.
598    fn shard_id(&mut self, alias: &str) -> ShardId {
599        *self
600            .shards
601            .entry(alias.to_string())
602            .or_insert_with(ShardId::new)
603    }
604
605    /// Allocate a fresh ephemeral global id for an internally-built dataflow.
606    fn alloc_internal(&mut self) -> GlobalId {
607        let id = self.next_internal;
608        self.next_internal += 1;
609        GlobalId::User(id)
610    }
611
612    /// Count the rows of a registered index at `ts` by running a `count(*)`
613    /// `Reduce` over it: build an ephemeral dataflow that index-imports `index_id`,
614    /// schedule and hydrate it, then peek its single-row output. An empty result
615    /// (the reduce emits no row over empty input) reads as a count of `0`.
616    async fn count_via_reduce(&mut self, index_id: u64, ts: u64) -> anyhow::Result<u64> {
617        let entry = self.indexes.get(&index_id).ok_or_else(|| {
618            anyhow::anyhow!("unknown index {index_id}; define it with define_index first")
619        })?;
620        let on_id = entry.on_id;
621        let key = entry.key.clone();
622        let on_type = entry.on_type.clone();
623
624        let reduce_id = self.alloc_internal();
625        let out_index_id = self.alloc_internal();
626        let df = count_over_index(
627            GlobalId::User(index_id),
628            GlobalId::User(on_id),
629            on_type,
630            key,
631            reduce_id,
632            out_index_id,
633            Timestamp::from(ts),
634        )?;
635        self.driver.submit_dataflow(df)?;
636        self.driver.schedule(out_index_id)?;
637        // The count is final once the output frontier passes `ts`.
638        self.driver
639            .expect_frontier(
640                out_index_id,
641                Timestamp::from(ts).step_forward(),
642                Duration::from_secs(DEFAULT_TIMEOUT_SECS),
643            )
644            .await?;
645
646        // The reduce output is a single non-null bigint column.
647        let count_desc = RelationDesc::builder()
648            .with_column(
649                "count",
650                SqlColumnType {
651                    scalar_type: SqlScalarType::Int64,
652                    nullable: false,
653                },
654            )
655            .finish();
656        let rows = self
657            .driver
658            .peek(
659                PeekTarget::Index { id: out_index_id },
660                count_desc,
661                Timestamp::from(ts),
662            )
663            .await?;
664        match rows.as_slice() {
665            // No row over empty input: count is zero.
666            [] => Ok(0),
667            [row] => {
668                let count = row.unpack_first().unwrap_int64();
669                Ok(u64::try_from(count)?)
670            }
671            other => anyhow::bail!(
672                "count reduce returned {} rows, expected 0 or 1",
673                other.len()
674            ),
675        }
676    }
677
678    /// Resolve a schema name, defaulting to the built-in sample schema when absent.
679    fn resolve_schema(&self, name: &Option<String>) -> anyhow::Result<RelationDesc> {
680        match name {
681            None => Ok(sample_desc()),
682            Some(name) => self.schemas.get(name).cloned().ok_or_else(|| {
683                anyhow::anyhow!("unknown schema {name:?}; declare it with define_schema first")
684            }),
685        }
686    }
687
688    /// Validate that a sink's declared output schema `desc` matches the column types
689    /// of the object `on_id` it exports. Compares column types (not inferred keys),
690    /// so a mismatched arity or type fails before the dataflow is submitted.
691    fn check_sink_schema(
692        &self,
693        builder: &DataflowBuilder,
694        on_id: u64,
695        desc: &RelationDesc,
696    ) -> anyhow::Result<()> {
697        let on_type = builder.get(GlobalId::User(on_id))?.typ();
698        let want = ReprRelationType::from(desc.typ());
699        anyhow::ensure!(
700            on_type.column_types == want.column_types,
701            "sink output schema does not match object {on_id}: \
702             declared {:?}, object is {:?}",
703            want.column_types,
704            on_type.column_types
705        );
706        Ok(())
707    }
708
709    /// Execute a single command, returning its golden output text.
710    pub async fn execute(&mut self, cmd: Command) -> anyhow::Result<String> {
711        match cmd {
712            Command::DefineSchema { name, columns } => {
713                let desc = relation_desc(&columns)?;
714                self.schemas.insert(name, desc);
715                Ok("ok".to_string())
716            }
717            Command::WriteSingleTs {
718                shard,
719                schema,
720                ts,
721                count,
722                start,
723                row_bytes,
724            } => {
725                let desc = self.resolve_schema(&schema)?;
726                let shard = self.shard_id(&shard);
727                let pad = row_bytes.unwrap_or(DEFAULT_ROW_BYTES);
728                let batch = synth_rows(&desc, start, count, pad);
729                write_rows_single_ts(&self.client, shard, &desc, &batch, Timestamp::from(ts))
730                    .await?;
731                Ok(format!("wrote {count}"))
732            }
733            Command::WriteSpread {
734                shard,
735                schema,
736                count,
737                n_ts,
738                start,
739                row_bytes,
740            } => {
741                let desc = self.resolve_schema(&schema)?;
742                let shard = self.shard_id(&shard);
743                let pad = row_bytes.unwrap_or(DEFAULT_ROW_BYTES);
744                let batch = synth_rows(&desc, start, count, pad);
745                write_rows_spread(&self.client, shard, &desc, &batch, n_ts).await?;
746                Ok(format!("wrote {count}"))
747            }
748            Command::WriteRows {
749                shard,
750                schema,
751                ts,
752                rows,
753            } => {
754                let desc = self.resolve_schema(&schema)?;
755                let batch = rows_from_tokens(&desc, &rows)?;
756                let written = batch.len();
757                let shard = self.shard_id(&shard);
758                write_rows_single_ts(&self.client, shard, &desc, &batch, Timestamp::from(ts))
759                    .await?;
760                Ok(format!("wrote {written}"))
761            }
762            Command::DefineIndex {
763                source_id,
764                index_id,
765                shard,
766                schema,
767                key,
768                as_of,
769                upper,
770            } => {
771                let desc = self.resolve_schema(&schema)?;
772                // Validate the key columns against the schema up front, so a bad
773                // index (e.g. key past the last column) yields a clean error rather
774                // than reaching the lowering with an out-of-range column reference.
775                let arity = desc.arity();
776                for &col in &key {
777                    anyhow::ensure!(
778                        col < arity,
779                        "key column {col} out of range for a {arity}-column schema"
780                    );
781                }
782                let shard = self.shard_id(&shard);
783                let on_type = ReprRelationType::from(desc.typ());
784                let df = index_dataflow(
785                    GlobalId::User(source_id),
786                    GlobalId::User(index_id),
787                    shard,
788                    self.loc.clone(),
789                    desc,
790                    key.clone(),
791                    Timestamp::from(as_of),
792                    Timestamp::from(upper),
793                )?;
794                self.driver.submit_dataflow(df)?;
795                // Register only after a successful submit, so a rejected index is
796                // not later importable or countable.
797                self.indexes.insert(
798                    index_id,
799                    IndexEntry {
800                        on_id: source_id,
801                        key,
802                        on_type,
803                    },
804                );
805                Ok("ok".to_string())
806            }
807            Command::Schedule { id } => {
808                self.driver.schedule(GlobalId::User(id))?;
809                Ok("ok".to_string())
810            }
811            Command::AllowCompaction { id, frontier } => {
812                self.driver.send(ComputeCommand::AllowCompaction {
813                    id: GlobalId::User(id),
814                    frontier: Antichain::from_elem(Timestamp::from(frontier)),
815                })?;
816                Ok("ok".to_string())
817            }
818            Command::AllowWrites { id } => {
819                self.driver
820                    .send(ComputeCommand::AllowWrites(GlobalId::User(id)))?;
821                Ok("ok".to_string())
822            }
823            Command::AwaitFrontier {
824                id,
825                ts,
826                timeout_secs,
827                allow_timeout,
828            } => {
829                let timeout = Duration::from_secs(timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
830                let result = self
831                    .driver
832                    .expect_frontier(GlobalId::User(id), Timestamp::from(ts), timeout)
833                    .await;
834                if allow_timeout {
835                    // The outcome is intentionally unobserved: emit a fixed token so
836                    // the golden output stays deterministic whether or not the
837                    // frontier was reached (see `multi_dataflow`, whose hydration is
838                    // nondeterministic).
839                    Ok("awaited".to_string())
840                } else {
841                    result?;
842                    Ok("ok".to_string())
843                }
844            }
845            Command::Count { id, ts } => {
846                let count = self.count_via_reduce(id, ts).await?;
847                Ok(count.to_string())
848            }
849            Command::CreateDataflow {
850                name,
851                imports,
852                builds,
853                exports,
854                as_of,
855                optimize,
856            } => {
857                let mut builder = DataflowBuilder::new(
858                    name.unwrap_or_else(|| "headless-create-dataflow".to_string()),
859                );
860                if optimize {
861                    builder.optimize();
862                }
863                // The parser's catalog resolves `Get u<n>` leaves by name; it
864                // assigns its own global ids, so we keep a name->our-id map and
865                // remap the parsed `Get`s back to the script's ids afterwards.
866                let mut catalog = TestCatalog::default();
867                let mut name_to_id: BTreeMap<String, GlobalId> = BTreeMap::new();
868                for import in imports {
869                    match import {
870                        ImportSpec::Source {
871                            id,
872                            shard,
873                            schema,
874                            upper,
875                        } => {
876                            let desc = self.resolve_schema(&schema)?;
877                            let id = GlobalId::User(id);
878                            register_catalog_object(
879                                &mut catalog,
880                                &mut name_to_id,
881                                id,
882                                desc.typ().clone(),
883                            )?;
884                            let shard = self.shard_id(&shard);
885                            builder.import_persist(
886                                id,
887                                PersistSource {
888                                    shard,
889                                    location: self.loc.clone(),
890                                    desc,
891                                    upper: Timestamp::from(upper),
892                                },
893                            );
894                        }
895                        ImportSpec::Index { index_id } => {
896                            let entry = self.indexes.get(&index_id).ok_or_else(|| {
897                                anyhow::anyhow!(
898                                    "unknown index {index_id}; define it before importing it"
899                                )
900                            })?;
901                            let on_id = GlobalId::User(entry.on_id);
902                            let key = entry.key.clone();
903                            let on_type = entry.on_type.clone();
904                            register_catalog_object(
905                                &mut catalog,
906                                &mut name_to_id,
907                                on_id,
908                                SqlRelationType::from_repr(&on_type),
909                            )?;
910                            builder.import_index(
911                                GlobalId::User(index_id),
912                                on_id,
913                                key,
914                                on_type,
915                                false,
916                            );
917                        }
918                    }
919                }
920                for build in builds {
921                    // Parse the pretty MIR spec against the catalog, then remap
922                    // its catalog-assigned `Get` ids to the script's ids.
923                    let mut expr = try_parse_mir(&catalog, &build.expr)
924                        .map_err(|e| anyhow::anyhow!("parsing MIR for object {}: {e}", build.id))?;
925                    remap_gets(&mut expr, &catalog, &name_to_id)?;
926                    let id = GlobalId::User(build.id);
927                    // Register the built object so later builds can `get` it.
928                    register_catalog_object(
929                        &mut catalog,
930                        &mut name_to_id,
931                        id,
932                        SqlRelationType::from_repr(&expr.typ()),
933                    )?;
934                    builder.build(id, expr);
935                }
936                // Wire each export onto the builder. Index exports are captured for
937                // later import / count assertions; sink exports route their output
938                // either to a target shard (materialized view) or back as responses
939                // (subscribe). Sink output schemas must match the exported object's
940                // type, validated here so a mismatch fails before submission.
941                let mut new_indexes = Vec::new();
942                let mut new_subscribes = Vec::new();
943                let mut new_mv_outputs = Vec::new();
944                for export in exports {
945                    match export {
946                        ExportSpec::Index {
947                            index_id,
948                            on_id,
949                            key,
950                        } => {
951                            let on_type = builder.get(GlobalId::User(on_id))?.typ();
952                            builder.export_index(
953                                GlobalId::User(index_id),
954                                GlobalId::User(on_id),
955                                key.clone(),
956                            );
957                            new_indexes.push((index_id, on_id, key, on_type));
958                        }
959                        ExportSpec::MaterializedView {
960                            sink_id,
961                            on_id,
962                            shard,
963                            schema,
964                        } => {
965                            let desc = self.resolve_schema(&schema)?;
966                            self.check_sink_schema(&builder, on_id, &desc)?;
967                            let shard = self.shard_id(&shard);
968                            let location = self.loc.clone();
969                            builder.export_materialized_view(
970                                GlobalId::User(sink_id),
971                                GlobalId::User(on_id),
972                                desc.clone(),
973                                PersistSink {
974                                    shard,
975                                    location: location.clone(),
976                                },
977                            );
978                            // Record the target shard so a later `peek` of the sink
979                            // id reads it via a persist peek (the `SELECT * FROM mv`
980                            // path), with no separate read-back command.
981                            new_mv_outputs.push((
982                                sink_id,
983                                CollectionMetadata {
984                                    persist_location: location,
985                                    data_shard: shard,
986                                    relation_desc: desc,
987                                    txns_shard: None,
988                                },
989                            ));
990                        }
991                        ExportSpec::Subscribe {
992                            sink_id,
993                            on_id,
994                            schema,
995                            up_to,
996                        } => {
997                            let desc = self.resolve_schema(&schema)?;
998                            self.check_sink_schema(&builder, on_id, &desc)?;
999                            builder.export_subscribe(
1000                                GlobalId::User(sink_id),
1001                                GlobalId::User(on_id),
1002                                desc,
1003                                up_to_antichain(up_to),
1004                            );
1005                            new_subscribes.push(sink_id);
1006                        }
1007                    }
1008                }
1009                builder.as_of(Timestamp::from(as_of));
1010                let df = builder.finish()?;
1011                self.driver.submit_dataflow(df)?;
1012                // Register only after a successful submit, so a rejected dataflow
1013                // leaves no dangling index entry or subscribe buffer.
1014                for (index_id, on_id, key, on_type) in new_indexes {
1015                    self.indexes.insert(
1016                        index_id,
1017                        IndexEntry {
1018                            on_id,
1019                            key,
1020                            on_type,
1021                        },
1022                    );
1023                }
1024                for sink_id in new_subscribes {
1025                    self.driver.register_subscribe(GlobalId::User(sink_id));
1026                }
1027                for (sink_id, metadata) in new_mv_outputs {
1028                    self.mv_outputs.insert(sink_id, metadata);
1029                }
1030                Ok("ok".to_string())
1031            }
1032            Command::Peek { id, schema, ts } => {
1033                let desc = self.resolve_schema(&schema)?;
1034                // A materialized-view sink id resolves to a persist peek of its
1035                // output shard; any other id is an index peek. The persist peek
1036                // blocks until the shard seals through `ts`, so it doubles as a wait
1037                // for the writing sink to catch up.
1038                let target = match self.mv_outputs.get(&id) {
1039                    Some(metadata) => PeekTarget::Persist {
1040                        id: GlobalId::User(id),
1041                        metadata: metadata.clone(),
1042                    },
1043                    None => PeekTarget::Index {
1044                        id: GlobalId::User(id),
1045                    },
1046                };
1047                let rows = self.driver.peek(target, desc, Timestamp::from(ts)).await?;
1048                Ok(render_rows(&rows))
1049            }
1050            Command::AwaitSubscribe {
1051                id,
1052                up_to,
1053                timeout_secs,
1054            } => {
1055                let timeout = Duration::from_secs(timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
1056                let updates = self
1057                    .driver
1058                    .await_subscribe(GlobalId::User(id), Timestamp::from(up_to), timeout)
1059                    .await?;
1060                Ok(render_updates(&updates))
1061            }
1062            Command::CreateInstance {
1063                expiration_offset,
1064                arrangement_dictionary_compression,
1065            } => {
1066                let expiration_offset = expiration_offset
1067                    .as_deref()
1068                    .map(|s| {
1069                        <Duration as ConfigType>::parse(s).map_err(|e| {
1070                            anyhow::anyhow!("expiration-offset {s:?} is not a duration: {e}")
1071                        })
1072                    })
1073                    .transpose()?;
1074                self.driver
1075                    .create_instance(expiration_offset, arrangement_dictionary_compression)?;
1076                Ok("ok".to_string())
1077            }
1078            Command::UpdateConfiguration { updates } => {
1079                let mut dyncfg_updates = ConfigUpdates::default();
1080                for setting in &updates {
1081                    dyncfg_updates.add_dynamic(
1082                        &setting.name,
1083                        parse_config_val(&setting.ty, &setting.value)?,
1084                    );
1085                }
1086                self.driver.update_configuration(dyncfg_updates)?;
1087                Ok("ok".to_string())
1088            }
1089            Command::Reconnect => {
1090                self.driver.reconnect().await?;
1091                Ok("ok".to_string())
1092            }
1093            Command::InitializationComplete => {
1094                self.driver.send(ComputeCommand::InitializationComplete)?;
1095                Ok("ok".to_string())
1096            }
1097        }
1098    }
1099}
1100
1101/// Run a script: parse `content` into stanzas, execute each command, and either
1102/// compare its output to the stanza's expected block or — when `REWRITE` is set
1103/// and `path` is given — rewrite the file in place with the actual outputs.
1104///
1105/// Returns `Err` if any stanza's output differs from its expected block, so a
1106/// scripted run exits non-zero on a mismatch (and CI fails). A command that fails
1107/// renders as `error: <message>`, so an expected failure is asserted by its
1108/// golden block rather than a special command.
1109pub async fn run(
1110    driver: Driver,
1111    loc: PersistLocation,
1112    content: &str,
1113    path: Option<&Path>,
1114) -> anyhow::Result<()> {
1115    let items = crate::text::parse_file(content)?;
1116    let mut state = ScriptState::new(driver, loc).await?;
1117    let rewrite = std::env::var_os("REWRITE").is_some();
1118
1119    let mut actuals = Vec::new();
1120    let mut mismatches = 0usize;
1121    for item in &items {
1122        let crate::text::Item::Stanza(stanza) = item else {
1123            continue;
1124        };
1125        let actual = match state.execute(stanza.command.clone()).await {
1126            Ok(output) => output,
1127            Err(e) => format!("error: {e}"),
1128        };
1129        let directive = stanza.input.lines().next().unwrap_or_default();
1130        if rewrite {
1131            println!("{directive} => {actual}");
1132        } else if actual == stanza.expected {
1133            println!("ok: {directive}");
1134        } else {
1135            mismatches += 1;
1136            println!(
1137                "MISMATCH: {directive}\n  expected: {:?}\n  actual:   {:?}",
1138                stanza.expected, actual
1139            );
1140        }
1141        actuals.push(actual);
1142    }
1143
1144    if rewrite {
1145        let path = path.context("REWRITE is set but the script came from stdin")?;
1146        std::fs::write(path, crate::text::rewrite(&items, &actuals))
1147            .with_context(|| format!("rewriting {}", path.display()))?;
1148        return Ok(());
1149    }
1150    if mismatches > 0 {
1151        anyhow::bail!("{mismatches} stanza(s) did not match their expected output");
1152    }
1153    Ok(())
1154}
1155
1156/// Render peeked rows as deterministic golden text: each row's datums joined by
1157/// spaces, with the rows sorted so the output is independent of arrangement order.
1158fn render_rows(rows: &[Row]) -> String {
1159    let mut lines: Vec<String> = rows
1160        .iter()
1161        .map(|row| {
1162            row.unpack()
1163                .iter()
1164                .map(|datum| datum.to_string())
1165                .collect::<Vec<_>>()
1166                .join(" ")
1167        })
1168        .collect();
1169    lines.sort();
1170    lines.join("\n")
1171}
1172
1173/// Convert an optional `up_to` timestamp into a sink's exclusive upper antichain;
1174/// `None` is the empty antichain (no bound — the sink runs indefinitely).
1175fn up_to_antichain(up_to: Option<u64>) -> Antichain<Timestamp> {
1176    match up_to {
1177        Some(t) => Antichain::from_elem(Timestamp::from(t)),
1178        None => Antichain::new(),
1179    }
1180}
1181
1182/// Render a subscribe's updates as golden text: `<ts> <diff> <datums>` per line.
1183/// Updates are consolidated by `(time, row)` — so split batches and retractions
1184/// collapse — net-zero rows dropped, and the lines sorted for determinism.
1185fn render_updates(updates: &[(Row, Timestamp, i64)]) -> String {
1186    let mut by_key: BTreeMap<(Timestamp, Row), i64> = BTreeMap::new();
1187    for (row, ts, diff) in updates {
1188        *by_key.entry((*ts, row.clone())).or_default() += diff;
1189    }
1190    let mut lines: Vec<String> = by_key
1191        .into_iter()
1192        .filter(|(_, diff)| *diff != 0)
1193        .map(|((ts, row), diff)| {
1194            let datums = row
1195                .unpack()
1196                .iter()
1197                .map(|datum| datum.to_string())
1198                .collect::<Vec<_>>()
1199                .join(" ");
1200            format!("{ts} {diff} {datums}")
1201        })
1202        .collect();
1203    lines.sort();
1204    lines.join("\n")
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::*;
1210
1211    /// `define_schema` types map to a `RelationDesc` with matching arity and
1212    /// nullability, and `synth_rows` fills it.
1213    #[mz_ore::test]
1214    fn schema_parse_and_synth() {
1215        let columns = vec![
1216            ColumnSpec {
1217                name: "k".to_string(),
1218                ty: "bigint".to_string(),
1219                nullable: false,
1220            },
1221            ColumnSpec {
1222                name: "flag".to_string(),
1223                ty: "boolean".to_string(),
1224                nullable: false,
1225            },
1226            ColumnSpec {
1227                name: "v".to_string(),
1228                ty: "text".to_string(),
1229                nullable: true,
1230            },
1231        ];
1232        let desc = relation_desc(&columns).unwrap();
1233        assert_eq!(desc.arity(), 3);
1234        let types: Vec<_> = desc.iter_types().collect();
1235        assert_eq!(types[0].scalar_type, SqlScalarType::Int64);
1236        assert_eq!(types[1].scalar_type, SqlScalarType::Bool);
1237        assert!(types[2].nullable);
1238
1239        let rows = synth_rows(&desc, 0, 4, 8);
1240        assert_eq!(rows.len(), 4);
1241
1242        assert!(scalar_type_from_str("nope").is_err());
1243    }
1244
1245    /// Tokens type into the right `Cell`s against their column, bare `null` is SQL
1246    /// null (rejected for a non-nullable column), and a bad numeric token errors.
1247    #[mz_ore::test]
1248    fn cell_from_token_maps_values() {
1249        let int_col = SqlColumnType {
1250            scalar_type: SqlScalarType::Int64,
1251            nullable: false,
1252        };
1253        let str_col = SqlColumnType {
1254            scalar_type: SqlScalarType::String,
1255            nullable: true,
1256        };
1257        assert_eq!(cell_from_token("7", &int_col).unwrap(), Cell::Int64(7));
1258        assert_eq!(
1259            cell_from_token("hi", &str_col).unwrap(),
1260            Cell::Str("hi".to_string())
1261        );
1262        // A quoted string keeps its contents; the quotes are stripped.
1263        assert_eq!(
1264            cell_from_token("\"a b\"", &str_col).unwrap(),
1265            Cell::Str("a b".to_string())
1266        );
1267        assert_eq!(cell_from_token("null", &str_col).unwrap(), Cell::Null);
1268        // null into a non-nullable column is an error.
1269        assert!(cell_from_token("null", &int_col).is_err());
1270        // a non-numeric token in an int column is an error.
1271        assert!(cell_from_token("x", &int_col).is_err());
1272    }
1273}