mz_clusterd_test_driver/script.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Executes a text command script against `clusterd`.
11//!
12//! Instead of recompiling a Rust scenario, a test (or an agent) writes a
13//! [`crate::text`] script: a sequence of commands, each with an expected output
14//! block (`----`) that is the assertion. The coarse orchestration verbs map almost
15//! directly to [`Driver`] calls; `define` carries arbitrary MIR (as pretty-form
16//! specs parsed by `mz-expr-parser`, the `.spec` test syntax) over the full
17//! [`DataflowBuilder`] surface, including index imports, while `define_index`
18//! stays as sugar for the common single-index shape. Explicit `write_rows`
19//! payloads are typed against the schema token-by-token via `cell_from_token`
20//! (reusing `mz_repr::strconv`) rather than `Row`'s opaque serde.
21//!
22//! # Execution
23//!
24//! [`run`] parses the script, executes each command, and compares its golden
25//! output to the expected block — failing the run on a mismatch, or rewriting the
26//! file when `REWRITE` is set. A command that fails renders as `error: <message>`,
27//! so an expected failure is asserted by its golden block. Assertions are
28//! level-triggered waits on monotonic frontiers, so a single sequential script is
29//! deterministic regardless of how the dataflows interleave.
30//!
31//! Shards are referenced by a string alias; the first command naming an alias
32//! allocates a fresh [`ShardId`] for it. Object ids are raw `u64`s mapped to
33//! [`GlobalId::User`].
34
35use std::collections::BTreeMap;
36use std::path::Path;
37use std::time::Duration;
38
39use anyhow::Context;
40use mz_compute_client::protocol::command::{ComputeCommand, PeekTarget};
41use mz_dyncfg::{ConfigType, ConfigUpdates, ConfigVal};
42use mz_expr::visit::Visit;
43use mz_expr::{Id, MirRelationExpr};
44use mz_expr_parser::{TestCatalog, try_parse_mir};
45use mz_persist_client::PersistClient;
46use mz_persist_types::{PersistLocation, ShardId};
47use mz_repr::{
48 GlobalId, RelationDesc, ReprRelationType, Row, SqlColumnType, SqlRelationType, SqlScalarType,
49 Timestamp, strconv,
50};
51use mz_storage_types::controller::CollectionMetadata;
52use serde::{Deserialize, Serialize};
53use timely::progress::Antichain;
54
55use crate::data::{
56 Cell, pack_cells, sample_desc, synth_rows, write_rows_single_ts, write_rows_spread,
57};
58use crate::dataflow::{
59 DataflowBuilder, PersistSink, PersistSource, count_over_index, index_dataflow,
60};
61use crate::driver::Driver;
62
63/// The default payload padding (bytes) for synthetic rows when a command omits it.
64const DEFAULT_ROW_BYTES: usize = 64;
65/// The default timeout (seconds) for `await_frontier` when a command omits it.
66const DEFAULT_TIMEOUT_SECS: u64 = 600;
67
68/// A column declaration in a `define_schema` command.
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
70pub struct ColumnSpec {
71 /// Column name.
72 pub name: String,
73 /// Scalar type name; see `scalar_type_from_str`.
74 #[serde(rename = "type")]
75 pub ty: String,
76 /// Whether the column admits `NULL`.
77 #[serde(default)]
78 pub nullable: bool,
79}
80
81/// A single dyncfg update in an `update-configuration` command: a config name, a
82/// type tag selecting how `value` is parsed (`bool`/`u32`/`usize`/`f64`/`string`/
83/// `duration`), and the value. Typed against [`mz_dyncfg`] at execution.
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85pub struct ConfigSetting {
86 /// The dyncfg name (sent to the replica by name; unknown names are ignored).
87 pub name: String,
88 /// The type tag selecting the [`ConfigVal`] variant.
89 #[serde(rename = "type")]
90 pub ty: String,
91 /// The value, parsed against `ty`.
92 pub value: String,
93}
94
95/// A collection to import in a `define` command: a persist source or an existing
96/// index. Externally tagged: `{"source": {…}}` or `{"index": {…}}`.
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99pub enum ImportSpec {
100 /// Import a persist-backed storage collection, as `define_index` does.
101 Source {
102 /// The imported source's global id.
103 id: u64,
104 /// Shard alias to import; allocated on first use.
105 shard: String,
106 /// Schema name; defaults to the built-in sample schema.
107 #[serde(default)]
108 schema: Option<String>,
109 /// The shard's exclusive write upper (see `PersistSource::upper`).
110 upper: u64,
111 },
112 /// Import an existing index by its global id; its arranged collection, key,
113 /// and type are taken from the registry, so it must have been defined first.
114 Index {
115 /// The index's global id.
116 index_id: u64,
117 },
118}
119
120/// A MIR object to build in a `define` command.
121#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
122pub struct BuildSpec {
123 /// The built object's global id.
124 pub id: u64,
125 /// The computation, as a pretty-form MIR spec parsed by `mz-expr-parser`
126 /// (e.g. `Reduce aggregates=[count(*)]` over `Get u1000`). It references
127 /// imported or previously-built objects by their global-id name (`u<n>`); the
128 /// leaf `Get`'s type is resolved from the import, not authored.
129 pub expr: String,
130}
131
132/// An export in a `create-dataflow` command, mirroring the export kinds a real
133/// dataflow produces (see [`mz_compute_types::sinks::ComputeSinkConnection`]).
134/// `copy-to` is intentionally absent: the parser rejects it as unimplemented.
135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
136#[serde(rename_all = "kebab-case")]
137pub enum ExportSpec {
138 /// An arrangement, peekable as an index and importable by later dataflows.
139 Index {
140 /// The exported index's global id.
141 index_id: u64,
142 /// The imported or built id the index arranges.
143 on_id: u64,
144 /// Columns to arrange by.
145 key: Vec<usize>,
146 },
147 /// A persist sink writing the collection to a shard (a materialized view),
148 /// verified by reading the shard back with a persist `peek` of the sink id.
149 MaterializedView {
150 /// The sink's global id (scheduled and frontier-tracked under this id).
151 sink_id: u64,
152 /// The imported or built id the sink writes.
153 on_id: u64,
154 /// Target shard alias; allocated on first use.
155 shard: String,
156 /// Output schema; defaults to the sample schema. Must match `on_id`'s type.
157 schema: Option<String>,
158 },
159 /// A subscribe sink streaming changes back as responses, collected by
160 /// `await-subscribe`.
161 Subscribe {
162 /// The sink's global id.
163 sink_id: u64,
164 /// The imported or built id the sink streams.
165 on_id: u64,
166 /// Output schema; defaults to the sample schema. Must match `on_id`'s type.
167 schema: Option<String>,
168 /// Exclusive upper at which the subscribe completes; unbounded if absent.
169 up_to: Option<u64>,
170 },
171}
172
173/// Map a JSON type name to a [`SqlScalarType`]. The supported set is intentionally
174/// small and matches [`crate::data::Cell`]; extend both together.
175fn scalar_type_from_str(s: &str) -> anyhow::Result<SqlScalarType> {
176 Ok(match s.to_ascii_lowercase().as_str() {
177 "int16" | "smallint" => SqlScalarType::Int16,
178 "int32" | "int" | "integer" => SqlScalarType::Int32,
179 "int64" | "bigint" => SqlScalarType::Int64,
180 "bool" | "boolean" => SqlScalarType::Bool,
181 "string" | "text" => SqlScalarType::String,
182 "bytes" | "bytea" => SqlScalarType::Bytes,
183 other => anyhow::bail!("unsupported column type {other:?}"),
184 })
185}
186
187/// Parse a configuration value string into a [`ConfigVal`], selecting the variant
188/// by a type tag. Each numeric/bool/duration type reuses [`mz_dyncfg`]'s own
189/// `ConfigType::parse` (so the accepted syntax — e.g. `on`/`off` for bool, humantime
190/// for duration — matches the rest of the codebase); `string` is taken verbatim.
191fn parse_config_val(ty: &str, value: &str) -> anyhow::Result<ConfigVal> {
192 let err = |e: String| anyhow::anyhow!("config value {value:?} is not a valid {ty}: {e}");
193 Ok(match ty {
194 "bool" => <bool as ConfigType>::parse(value).map_err(err)?.into(),
195 "u32" => <u32 as ConfigType>::parse(value).map_err(err)?.into(),
196 "usize" => <usize as ConfigType>::parse(value).map_err(err)?.into(),
197 "f64" => <f64 as ConfigType>::parse(value).map_err(err)?.into(),
198 "duration" => <Duration as ConfigType>::parse(value).map_err(err)?.into(),
199 "string" => ConfigVal::String(value.to_string()),
200 other => anyhow::bail!(
201 "unsupported config type {other:?}; use bool/u32/usize/f64/duration/string"
202 ),
203 })
204}
205
206/// Build a [`RelationDesc`] from column specs.
207fn relation_desc(columns: &[ColumnSpec]) -> anyhow::Result<RelationDesc> {
208 let mut builder = RelationDesc::builder();
209 for col in columns {
210 builder = builder.with_column(
211 col.name.as_str(),
212 SqlColumnType {
213 scalar_type: scalar_type_from_str(&col.ty)?,
214 nullable: col.nullable,
215 },
216 );
217 }
218 Ok(builder.finish())
219}
220
221/// Strip surrounding double quotes from a token, if present.
222fn unquote(s: &str) -> &str {
223 s.strip_prefix('"')
224 .and_then(|s| s.strip_suffix('"'))
225 .unwrap_or(s)
226}
227
228/// Type a raw row-value token against its column into an owned [`Cell`].
229///
230/// The bare token `null` is SQL `NULL` (only in a nullable column); quote it
231/// (`"null"`) for the literal string. Numeric and boolean tokens go through
232/// [`mz_repr::strconv`] — the canonical PostgreSQL-compatible text parser the rest
233/// of the codebase uses — so the accepted syntax matches `mz_pgrepr`'s text decode.
234/// `string`/`bytes` columns take the (unquoted) token verbatim; `bytes` is its
235/// UTF-8 encoding.
236fn cell_from_token(token: &str, col: &SqlColumnType) -> anyhow::Result<Cell> {
237 if token == "null" {
238 anyhow::ensure!(col.nullable, "null value in non-nullable column");
239 return Ok(Cell::Null);
240 }
241 let parse =
242 |kind: &str, e: strconv::ParseError| anyhow::anyhow!("parsing {token:?} as {kind}: {e}");
243 let cell = match col.scalar_type {
244 SqlScalarType::Int16 => {
245 Cell::Int16(strconv::parse_int16(token).map_err(|e| parse("int16", e))?)
246 }
247 SqlScalarType::Int32 => {
248 Cell::Int32(strconv::parse_int32(token).map_err(|e| parse("int32", e))?)
249 }
250 SqlScalarType::Int64 => {
251 Cell::Int64(strconv::parse_int64(token).map_err(|e| parse("int64", e))?)
252 }
253 SqlScalarType::Bool => {
254 Cell::Bool(strconv::parse_bool(token).map_err(|e| parse("bool", e))?)
255 }
256 SqlScalarType::String => Cell::Str(unquote(token).to_string()),
257 SqlScalarType::Bytes => Cell::Bytes(unquote(token).as_bytes().to_vec()),
258 ref other => anyhow::bail!("unsupported column type {other:?}"),
259 };
260 Ok(cell)
261}
262
263/// Pack explicit row tokens against `desc`, validating arity per row.
264fn rows_from_tokens(desc: &RelationDesc, rows: &[Vec<String>]) -> anyhow::Result<Vec<Row>> {
265 let cols: Vec<&SqlColumnType> = desc.iter_types().collect();
266 let mut out = Vec::with_capacity(rows.len());
267 for (r, row) in rows.iter().enumerate() {
268 anyhow::ensure!(
269 row.len() == cols.len(),
270 "row {r} has {} values but schema has {} columns",
271 row.len(),
272 cols.len()
273 );
274 // Arity validated above, so indexing `cols` by position is in bounds.
275 let cells = row
276 .iter()
277 .enumerate()
278 .map(|(c, v)| cell_from_token(v, cols[c]))
279 .collect::<anyhow::Result<Vec<Cell>>>()?;
280 out.push(pack_cells(&cells));
281 }
282 Ok(out)
283}
284
285/// Register a referenceable object in the parser `catalog` under its global-id
286/// name (e.g. `u1000`), recording the name-to-id mapping so the parsed `Get`s —
287/// which carry the catalog's own assigned ids — can be remapped back to the
288/// script's ids.
289fn register_catalog_object(
290 catalog: &mut TestCatalog,
291 name_to_id: &mut BTreeMap<String, GlobalId>,
292 id: GlobalId,
293 sql_typ: SqlRelationType,
294) -> anyhow::Result<()> {
295 let name = id.to_string();
296 // Column names are only used for display; `Get` references columns by `#n`,
297 // so synthetic `c0..cN` names suffice.
298 let cols = (0..sql_typ.column_types.len())
299 .map(|i| format!("c{i}"))
300 .collect();
301 catalog
302 .insert(&name, cols, sql_typ, false)
303 .map_err(|e| anyhow::anyhow!("registering {name} in catalog: {e}"))?;
304 name_to_id.insert(name, id);
305 Ok(())
306}
307
308/// Rewrite every global `Get` in `expr` from the catalog's assigned id back to
309/// the script's id, looked up by the object's name.
310fn remap_gets(
311 expr: &mut MirRelationExpr,
312 catalog: &TestCatalog,
313 name_to_id: &BTreeMap<String, GlobalId>,
314) -> anyhow::Result<()> {
315 expr.try_visit_mut_post::<_, anyhow::Error>(&mut |e| {
316 if let MirRelationExpr::Get {
317 id: Id::Global(g), ..
318 } = e
319 {
320 let name = catalog
321 .get_source_name(g)
322 .ok_or_else(|| anyhow::anyhow!("get of unknown catalog object {g}"))?;
323 let id = name_to_id
324 .get(name)
325 .ok_or_else(|| anyhow::anyhow!("get of unregistered object {name}"))?;
326 *g = *id;
327 }
328 Ok(())
329 })
330}
331
332/// A command read from the script stream.
333///
334/// Tagged on `"cmd"`, snake_case, e.g.
335/// `{"cmd":"write_single_ts","shard":"s1","ts":0,"rows":1000}`.
336#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
337#[serde(tag = "cmd", rename_all = "snake_case")]
338pub enum Command {
339 /// Declare a named relation schema for later `schema` references.
340 DefineSchema {
341 /// Schema name, referenced by `schema` fields on other commands.
342 name: String,
343 /// Ordered column declarations.
344 columns: Vec<ColumnSpec>,
345 },
346 /// Write `count` synthetic rows to `shard` at a single timestamp `ts`.
347 WriteSingleTs {
348 /// Shard alias; allocated on first use.
349 shard: String,
350 /// Schema name; defaults to the built-in `(bigint, text)` sample schema.
351 #[serde(default)]
352 schema: Option<String>,
353 /// The timestamp to write at.
354 ts: u64,
355 /// Number of synthetic rows to write.
356 count: u64,
357 /// First synthetic row index, so successive batches can use disjoint id
358 /// ranges (`start..start + count`) that never consolidate. Defaults to 0.
359 #[serde(default)]
360 start: u64,
361 /// Payload padding per row; defaults to `DEFAULT_ROW_BYTES`.
362 #[serde(default)]
363 row_bytes: Option<usize>,
364 },
365 /// Write `count` synthetic rows to `shard`, spread across `n_ts` timestamps in a
366 /// single append.
367 WriteSpread {
368 /// Shard alias; allocated on first use.
369 shard: String,
370 /// Schema name; defaults to the built-in sample schema.
371 #[serde(default)]
372 schema: Option<String>,
373 /// Number of synthetic rows to write.
374 count: u64,
375 /// Number of distinct timestamps to spread the rows across.
376 n_ts: u64,
377 /// First synthetic row index (see [`Command::WriteSingleTs`]). Defaults to 0.
378 #[serde(default)]
379 start: u64,
380 /// Payload padding per row; defaults to `DEFAULT_ROW_BYTES`.
381 #[serde(default)]
382 row_bytes: Option<usize>,
383 },
384 /// Write explicit rows to `shard` at a single timestamp `ts`. Each row is an
385 /// array of JSON values matching the schema's columns in order.
386 WriteRows {
387 /// Shard alias; allocated on first use.
388 shard: String,
389 /// Schema name; defaults to the built-in sample schema.
390 #[serde(default)]
391 schema: Option<String>,
392 /// The timestamp to write at.
393 ts: u64,
394 /// Rows as arrays of raw value tokens, typed against the schema by
395 /// `cell_from_token`.
396 rows: Vec<Vec<String>>,
397 },
398 /// Submit (without scheduling) an index dataflow over `shard`.
399 DefineIndex {
400 /// The imported source's global id.
401 source_id: u64,
402 /// The exported index's global id.
403 index_id: u64,
404 /// Shard alias to import; must already exist.
405 shard: String,
406 /// Schema name; defaults to the built-in sample schema. Must match what was
407 /// written to `shard`.
408 #[serde(default)]
409 schema: Option<String>,
410 /// Columns to arrange by.
411 key: Vec<usize>,
412 /// The dataflow's `as_of`.
413 as_of: u64,
414 /// The shard's exclusive write upper (see `PersistSource::upper`).
415 upper: u64,
416 },
417 /// Schedule a previously-submitted collection so it makes progress.
418 Schedule {
419 /// The collection's global id.
420 id: u64,
421 },
422 /// Advance an index's read frontier (`since`) via `AllowCompaction`.
423 AllowCompaction {
424 /// The index's global id.
425 id: u64,
426 /// The new read frontier.
427 frontier: u64,
428 },
429 /// Take a collection out of read-only mode via `AllowWrites`, letting its
430 /// persist sink begin writing. Every dataflow starts read-only; indexes,
431 /// subscribes, and peeks work regardless, but a materialized-view sink withholds
432 /// all persist writes until this is sent for its sink id.
433 AllowWrites {
434 /// The sink's global id.
435 id: u64,
436 },
437 /// Wait until `id`'s output frontier reaches `ts`, or fail after the timeout.
438 AwaitFrontier {
439 /// The collection's global id.
440 id: u64,
441 /// The target output-frontier timestamp.
442 ts: u64,
443 /// Timeout in seconds; defaults to `DEFAULT_TIMEOUT_SECS`.
444 #[serde(default)]
445 timeout_secs: Option<u64>,
446 /// If true, a timeout is reported (`status: timeout`) without failing the
447 /// run. Used by reproductions where not reaching the frontier is an
448 /// expected outcome, not an assertion failure.
449 #[serde(default)]
450 allow_timeout: bool,
451 },
452 /// Count `id`'s rows at `ts` and emit the count.
453 ///
454 /// Sugar over a `Reduce`: builds an ephemeral dataflow that index-imports `id`,
455 /// computes `count(*)` over it, and peeks the single-row result — so the count
456 /// runs through a real reduce operator rather than being tallied in the driver.
457 /// `id` must have been registered by a prior `define_index` (or `define`
458 /// export). The golden output is the count; the script's `----` block asserts it.
459 Count {
460 /// The index's global id.
461 id: u64,
462 /// The timestamp to count at.
463 ts: u64,
464 },
465 /// Submit (without scheduling) a dataflow built from generic MIR — the
466 /// abstraction behind index / materialized-view / subscribe / copy-to.
467 ///
468 /// A projection of [`DataflowBuilder`]: import sources and/or existing indexes,
469 /// build MIR objects (each a pretty-form MIR spec; see [`BuildSpec`]), and
470 /// export over them. Exports are index, materialized-view, or subscribe (see
471 /// [`ExportSpec`]); copy-to is not implemented. Exported indexes are registered
472 /// for later import or count assertion; subscribe sinks register a response
473 /// buffer for `await-subscribe`. `define-index` is sugar over this. The optional
474 /// `optimize` flag runs the MIR optimizer before lowering (needed for joins).
475 CreateDataflow {
476 /// Debug name for the dataflow; defaults to `headless-create-dataflow`.
477 #[serde(default)]
478 name: Option<String>,
479 /// Collections to import (persist sources and/or existing indexes).
480 #[serde(default)]
481 imports: Vec<ImportSpec>,
482 /// MIR objects to compute, each bound to an id.
483 #[serde(default)]
484 builds: Vec<BuildSpec>,
485 /// Exports over imported or built ids.
486 #[serde(default)]
487 exports: Vec<ExportSpec>,
488 /// The dataflow's `as_of`.
489 as_of: u64,
490 /// Run the MIR optimizer before lowering (needed for e.g. joins). Off by
491 /// default, so the caller's MIR is lowered faithfully.
492 #[serde(default)]
493 optimize: bool,
494 },
495 /// Peek `id` at `ts` and emit the returned rows (sorted, one per line). The
496 /// generic output assertion: the script's `----` block holds the expected rows.
497 Peek {
498 /// The index's global id.
499 id: u64,
500 /// Schema name describing the peek's output; defaults to the sample schema.
501 #[serde(default)]
502 schema: Option<String>,
503 /// The timestamp to peek at.
504 ts: u64,
505 },
506 /// Wait for subscribe sink `id`'s upper to reach `up_to`, then emit its
507 /// accumulated updates as `<ts> <diff> <datums>` lines (consolidated, sorted).
508 /// The output assertion for a subscribe sink.
509 AwaitSubscribe {
510 /// The subscribe sink's global id.
511 id: u64,
512 /// The exclusive upper to wait for (typically the sink's `up_to`).
513 up_to: u64,
514 /// Timeout in seconds; defaults to `DEFAULT_TIMEOUT_SECS`.
515 timeout_secs: Option<u64>,
516 },
517 /// Send `CreateInstance`, opening the compute instance (and the reconciliation
518 /// window). The settable [`InstanceConfig`] knobs default to the values a plain
519 /// `create-instance` supplies; the peek-stash location is always the host's, and
520 /// the peek-response stash is force-disabled (see `Driver::create_instance`).
521 ///
522 /// [`InstanceConfig`]: mz_compute_client::protocol::command::InstanceConfig
523 CreateInstance {
524 /// Replica expiration offset (a duration like `30s`); none if absent.
525 #[serde(default)]
526 expiration_offset: Option<String>,
527 /// Whether arrangements use dictionary compression.
528 #[serde(default)]
529 arrangement_dictionary_compression: bool,
530 },
531 /// Send `UpdateConfiguration` with a table of dyncfg updates (`name type value`
532 /// rows). Generic over any configuration; the peek-response stash is not settable
533 /// here (it is force-disabled at instance creation).
534 UpdateConfiguration {
535 /// The dyncfg updates to apply.
536 #[serde(default)]
537 updates: Vec<ConfigSetting>,
538 },
539 /// Drop the current connection and reconnect, sending only `Hello`. Re-issue
540 /// `create_instance`, replay the dataflows the replica should keep, then
541 /// `initialization_complete` to close the reconciliation window.
542 Reconnect,
543 /// Send `InitializationComplete`, closing the reconciliation window.
544 InitializationComplete,
545}
546
547/// What the registry remembers about an exported index, so a later `define`
548/// import or count assertion can reconstruct the import without re-declaring it.
549struct IndexEntry {
550 /// The id of the collection the index arranges.
551 on_id: u64,
552 /// The columns the index is arranged by.
553 key: Vec<usize>,
554 /// The arranged collection's relation type (for `import_index`).
555 on_type: ReprRelationType,
556}
557
558/// The base for ephemeral global ids the count sugar allocates. Far above any
559/// id a script would use, so its dataflows never collide with user objects.
560const INTERNAL_ID_BASE: u64 = u64::MAX / 2;
561
562/// Mutable state threaded through a script run.
563pub struct ScriptState {
564 driver: Driver,
565 client: PersistClient,
566 loc: PersistLocation,
567 /// Named schemas declared via `define_schema`.
568 schemas: BTreeMap<String, RelationDesc>,
569 /// Alias-to-shard map; aliases are allocated lazily on first use.
570 shards: BTreeMap<String, ShardId>,
571 /// Exported indexes, by global id, for later import / count assertions.
572 indexes: BTreeMap<u64, IndexEntry>,
573 /// Materialized-view sink outputs, by sink global id: the target shard's
574 /// metadata, so a `peek` of the sink id reads its shard via a persist peek.
575 mv_outputs: BTreeMap<u64, CollectionMetadata>,
576 /// Next ephemeral id for the count sugar's dataflows.
577 next_internal: u64,
578}
579
580impl ScriptState {
581 /// Build the state from a connected driver and its persist location, opening a
582 /// persist client.
583 pub async fn new(driver: Driver, loc: PersistLocation) -> anyhow::Result<Self> {
584 let client = driver.host.client().await?;
585 Ok(ScriptState {
586 driver,
587 client,
588 loc,
589 schemas: BTreeMap::new(),
590 shards: BTreeMap::new(),
591 indexes: BTreeMap::new(),
592 mv_outputs: BTreeMap::new(),
593 next_internal: INTERNAL_ID_BASE,
594 })
595 }
596
597 /// Resolve a shard alias, allocating a fresh [`ShardId`] on first use.
598 fn shard_id(&mut self, alias: &str) -> ShardId {
599 *self
600 .shards
601 .entry(alias.to_string())
602 .or_insert_with(ShardId::new)
603 }
604
605 /// Allocate a fresh ephemeral global id for an internally-built dataflow.
606 fn alloc_internal(&mut self) -> GlobalId {
607 let id = self.next_internal;
608 self.next_internal += 1;
609 GlobalId::User(id)
610 }
611
612 /// Count the rows of a registered index at `ts` by running a `count(*)`
613 /// `Reduce` over it: build an ephemeral dataflow that index-imports `index_id`,
614 /// schedule and hydrate it, then peek its single-row output. An empty result
615 /// (the reduce emits no row over empty input) reads as a count of `0`.
616 async fn count_via_reduce(&mut self, index_id: u64, ts: u64) -> anyhow::Result<u64> {
617 let entry = self.indexes.get(&index_id).ok_or_else(|| {
618 anyhow::anyhow!("unknown index {index_id}; define it with define_index first")
619 })?;
620 let on_id = entry.on_id;
621 let key = entry.key.clone();
622 let on_type = entry.on_type.clone();
623
624 let reduce_id = self.alloc_internal();
625 let out_index_id = self.alloc_internal();
626 let df = count_over_index(
627 GlobalId::User(index_id),
628 GlobalId::User(on_id),
629 on_type,
630 key,
631 reduce_id,
632 out_index_id,
633 Timestamp::from(ts),
634 )?;
635 self.driver.submit_dataflow(df)?;
636 self.driver.schedule(out_index_id)?;
637 // The count is final once the output frontier passes `ts`.
638 self.driver
639 .expect_frontier(
640 out_index_id,
641 Timestamp::from(ts).step_forward(),
642 Duration::from_secs(DEFAULT_TIMEOUT_SECS),
643 )
644 .await?;
645
646 // The reduce output is a single non-null bigint column.
647 let count_desc = RelationDesc::builder()
648 .with_column(
649 "count",
650 SqlColumnType {
651 scalar_type: SqlScalarType::Int64,
652 nullable: false,
653 },
654 )
655 .finish();
656 let rows = self
657 .driver
658 .peek(
659 PeekTarget::Index { id: out_index_id },
660 count_desc,
661 Timestamp::from(ts),
662 )
663 .await?;
664 match rows.as_slice() {
665 // No row over empty input: count is zero.
666 [] => Ok(0),
667 [row] => {
668 let count = row.unpack_first().unwrap_int64();
669 Ok(u64::try_from(count)?)
670 }
671 other => anyhow::bail!(
672 "count reduce returned {} rows, expected 0 or 1",
673 other.len()
674 ),
675 }
676 }
677
678 /// Resolve a schema name, defaulting to the built-in sample schema when absent.
679 fn resolve_schema(&self, name: &Option<String>) -> anyhow::Result<RelationDesc> {
680 match name {
681 None => Ok(sample_desc()),
682 Some(name) => self.schemas.get(name).cloned().ok_or_else(|| {
683 anyhow::anyhow!("unknown schema {name:?}; declare it with define_schema first")
684 }),
685 }
686 }
687
688 /// Validate that a sink's declared output schema `desc` matches the column types
689 /// of the object `on_id` it exports. Compares column types (not inferred keys),
690 /// so a mismatched arity or type fails before the dataflow is submitted.
691 fn check_sink_schema(
692 &self,
693 builder: &DataflowBuilder,
694 on_id: u64,
695 desc: &RelationDesc,
696 ) -> anyhow::Result<()> {
697 let on_type = builder.get(GlobalId::User(on_id))?.typ();
698 let want = ReprRelationType::from(desc.typ());
699 anyhow::ensure!(
700 on_type.column_types == want.column_types,
701 "sink output schema does not match object {on_id}: \
702 declared {:?}, object is {:?}",
703 want.column_types,
704 on_type.column_types
705 );
706 Ok(())
707 }
708
709 /// Execute a single command, returning its golden output text.
710 pub async fn execute(&mut self, cmd: Command) -> anyhow::Result<String> {
711 match cmd {
712 Command::DefineSchema { name, columns } => {
713 let desc = relation_desc(&columns)?;
714 self.schemas.insert(name, desc);
715 Ok("ok".to_string())
716 }
717 Command::WriteSingleTs {
718 shard,
719 schema,
720 ts,
721 count,
722 start,
723 row_bytes,
724 } => {
725 let desc = self.resolve_schema(&schema)?;
726 let shard = self.shard_id(&shard);
727 let pad = row_bytes.unwrap_or(DEFAULT_ROW_BYTES);
728 let batch = synth_rows(&desc, start, count, pad);
729 write_rows_single_ts(&self.client, shard, &desc, &batch, Timestamp::from(ts))
730 .await?;
731 Ok(format!("wrote {count}"))
732 }
733 Command::WriteSpread {
734 shard,
735 schema,
736 count,
737 n_ts,
738 start,
739 row_bytes,
740 } => {
741 let desc = self.resolve_schema(&schema)?;
742 let shard = self.shard_id(&shard);
743 let pad = row_bytes.unwrap_or(DEFAULT_ROW_BYTES);
744 let batch = synth_rows(&desc, start, count, pad);
745 write_rows_spread(&self.client, shard, &desc, &batch, n_ts).await?;
746 Ok(format!("wrote {count}"))
747 }
748 Command::WriteRows {
749 shard,
750 schema,
751 ts,
752 rows,
753 } => {
754 let desc = self.resolve_schema(&schema)?;
755 let batch = rows_from_tokens(&desc, &rows)?;
756 let written = batch.len();
757 let shard = self.shard_id(&shard);
758 write_rows_single_ts(&self.client, shard, &desc, &batch, Timestamp::from(ts))
759 .await?;
760 Ok(format!("wrote {written}"))
761 }
762 Command::DefineIndex {
763 source_id,
764 index_id,
765 shard,
766 schema,
767 key,
768 as_of,
769 upper,
770 } => {
771 let desc = self.resolve_schema(&schema)?;
772 // Validate the key columns against the schema up front, so a bad
773 // index (e.g. key past the last column) yields a clean error rather
774 // than reaching the lowering with an out-of-range column reference.
775 let arity = desc.arity();
776 for &col in &key {
777 anyhow::ensure!(
778 col < arity,
779 "key column {col} out of range for a {arity}-column schema"
780 );
781 }
782 let shard = self.shard_id(&shard);
783 let on_type = ReprRelationType::from(desc.typ());
784 let df = index_dataflow(
785 GlobalId::User(source_id),
786 GlobalId::User(index_id),
787 shard,
788 self.loc.clone(),
789 desc,
790 key.clone(),
791 Timestamp::from(as_of),
792 Timestamp::from(upper),
793 )?;
794 self.driver.submit_dataflow(df)?;
795 // Register only after a successful submit, so a rejected index is
796 // not later importable or countable.
797 self.indexes.insert(
798 index_id,
799 IndexEntry {
800 on_id: source_id,
801 key,
802 on_type,
803 },
804 );
805 Ok("ok".to_string())
806 }
807 Command::Schedule { id } => {
808 self.driver.schedule(GlobalId::User(id))?;
809 Ok("ok".to_string())
810 }
811 Command::AllowCompaction { id, frontier } => {
812 self.driver.send(ComputeCommand::AllowCompaction {
813 id: GlobalId::User(id),
814 frontier: Antichain::from_elem(Timestamp::from(frontier)),
815 })?;
816 Ok("ok".to_string())
817 }
818 Command::AllowWrites { id } => {
819 self.driver
820 .send(ComputeCommand::AllowWrites(GlobalId::User(id)))?;
821 Ok("ok".to_string())
822 }
823 Command::AwaitFrontier {
824 id,
825 ts,
826 timeout_secs,
827 allow_timeout,
828 } => {
829 let timeout = Duration::from_secs(timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
830 let result = self
831 .driver
832 .expect_frontier(GlobalId::User(id), Timestamp::from(ts), timeout)
833 .await;
834 if allow_timeout {
835 // The outcome is intentionally unobserved: emit a fixed token so
836 // the golden output stays deterministic whether or not the
837 // frontier was reached (see `multi_dataflow`, whose hydration is
838 // nondeterministic).
839 Ok("awaited".to_string())
840 } else {
841 result?;
842 Ok("ok".to_string())
843 }
844 }
845 Command::Count { id, ts } => {
846 let count = self.count_via_reduce(id, ts).await?;
847 Ok(count.to_string())
848 }
849 Command::CreateDataflow {
850 name,
851 imports,
852 builds,
853 exports,
854 as_of,
855 optimize,
856 } => {
857 let mut builder = DataflowBuilder::new(
858 name.unwrap_or_else(|| "headless-create-dataflow".to_string()),
859 );
860 if optimize {
861 builder.optimize();
862 }
863 // The parser's catalog resolves `Get u<n>` leaves by name; it
864 // assigns its own global ids, so we keep a name->our-id map and
865 // remap the parsed `Get`s back to the script's ids afterwards.
866 let mut catalog = TestCatalog::default();
867 let mut name_to_id: BTreeMap<String, GlobalId> = BTreeMap::new();
868 for import in imports {
869 match import {
870 ImportSpec::Source {
871 id,
872 shard,
873 schema,
874 upper,
875 } => {
876 let desc = self.resolve_schema(&schema)?;
877 let id = GlobalId::User(id);
878 register_catalog_object(
879 &mut catalog,
880 &mut name_to_id,
881 id,
882 desc.typ().clone(),
883 )?;
884 let shard = self.shard_id(&shard);
885 builder.import_persist(
886 id,
887 PersistSource {
888 shard,
889 location: self.loc.clone(),
890 desc,
891 upper: Timestamp::from(upper),
892 },
893 );
894 }
895 ImportSpec::Index { index_id } => {
896 let entry = self.indexes.get(&index_id).ok_or_else(|| {
897 anyhow::anyhow!(
898 "unknown index {index_id}; define it before importing it"
899 )
900 })?;
901 let on_id = GlobalId::User(entry.on_id);
902 let key = entry.key.clone();
903 let on_type = entry.on_type.clone();
904 register_catalog_object(
905 &mut catalog,
906 &mut name_to_id,
907 on_id,
908 SqlRelationType::from_repr(&on_type),
909 )?;
910 builder.import_index(
911 GlobalId::User(index_id),
912 on_id,
913 key,
914 on_type,
915 false,
916 );
917 }
918 }
919 }
920 for build in builds {
921 // Parse the pretty MIR spec against the catalog, then remap
922 // its catalog-assigned `Get` ids to the script's ids.
923 let mut expr = try_parse_mir(&catalog, &build.expr)
924 .map_err(|e| anyhow::anyhow!("parsing MIR for object {}: {e}", build.id))?;
925 remap_gets(&mut expr, &catalog, &name_to_id)?;
926 let id = GlobalId::User(build.id);
927 // Register the built object so later builds can `get` it.
928 register_catalog_object(
929 &mut catalog,
930 &mut name_to_id,
931 id,
932 SqlRelationType::from_repr(&expr.typ()),
933 )?;
934 builder.build(id, expr);
935 }
936 // Wire each export onto the builder. Index exports are captured for
937 // later import / count assertions; sink exports route their output
938 // either to a target shard (materialized view) or back as responses
939 // (subscribe). Sink output schemas must match the exported object's
940 // type, validated here so a mismatch fails before submission.
941 let mut new_indexes = Vec::new();
942 let mut new_subscribes = Vec::new();
943 let mut new_mv_outputs = Vec::new();
944 for export in exports {
945 match export {
946 ExportSpec::Index {
947 index_id,
948 on_id,
949 key,
950 } => {
951 let on_type = builder.get(GlobalId::User(on_id))?.typ();
952 builder.export_index(
953 GlobalId::User(index_id),
954 GlobalId::User(on_id),
955 key.clone(),
956 );
957 new_indexes.push((index_id, on_id, key, on_type));
958 }
959 ExportSpec::MaterializedView {
960 sink_id,
961 on_id,
962 shard,
963 schema,
964 } => {
965 let desc = self.resolve_schema(&schema)?;
966 self.check_sink_schema(&builder, on_id, &desc)?;
967 let shard = self.shard_id(&shard);
968 let location = self.loc.clone();
969 builder.export_materialized_view(
970 GlobalId::User(sink_id),
971 GlobalId::User(on_id),
972 desc.clone(),
973 PersistSink {
974 shard,
975 location: location.clone(),
976 },
977 );
978 // Record the target shard so a later `peek` of the sink
979 // id reads it via a persist peek (the `SELECT * FROM mv`
980 // path), with no separate read-back command.
981 new_mv_outputs.push((
982 sink_id,
983 CollectionMetadata {
984 persist_location: location,
985 data_shard: shard,
986 relation_desc: desc,
987 txns_shard: None,
988 },
989 ));
990 }
991 ExportSpec::Subscribe {
992 sink_id,
993 on_id,
994 schema,
995 up_to,
996 } => {
997 let desc = self.resolve_schema(&schema)?;
998 self.check_sink_schema(&builder, on_id, &desc)?;
999 builder.export_subscribe(
1000 GlobalId::User(sink_id),
1001 GlobalId::User(on_id),
1002 desc,
1003 up_to_antichain(up_to),
1004 );
1005 new_subscribes.push(sink_id);
1006 }
1007 }
1008 }
1009 builder.as_of(Timestamp::from(as_of));
1010 let df = builder.finish()?;
1011 self.driver.submit_dataflow(df)?;
1012 // Register only after a successful submit, so a rejected dataflow
1013 // leaves no dangling index entry or subscribe buffer.
1014 for (index_id, on_id, key, on_type) in new_indexes {
1015 self.indexes.insert(
1016 index_id,
1017 IndexEntry {
1018 on_id,
1019 key,
1020 on_type,
1021 },
1022 );
1023 }
1024 for sink_id in new_subscribes {
1025 self.driver.register_subscribe(GlobalId::User(sink_id));
1026 }
1027 for (sink_id, metadata) in new_mv_outputs {
1028 self.mv_outputs.insert(sink_id, metadata);
1029 }
1030 Ok("ok".to_string())
1031 }
1032 Command::Peek { id, schema, ts } => {
1033 let desc = self.resolve_schema(&schema)?;
1034 // A materialized-view sink id resolves to a persist peek of its
1035 // output shard; any other id is an index peek. The persist peek
1036 // blocks until the shard seals through `ts`, so it doubles as a wait
1037 // for the writing sink to catch up.
1038 let target = match self.mv_outputs.get(&id) {
1039 Some(metadata) => PeekTarget::Persist {
1040 id: GlobalId::User(id),
1041 metadata: metadata.clone(),
1042 },
1043 None => PeekTarget::Index {
1044 id: GlobalId::User(id),
1045 },
1046 };
1047 let rows = self.driver.peek(target, desc, Timestamp::from(ts)).await?;
1048 Ok(render_rows(&rows))
1049 }
1050 Command::AwaitSubscribe {
1051 id,
1052 up_to,
1053 timeout_secs,
1054 } => {
1055 let timeout = Duration::from_secs(timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
1056 let updates = self
1057 .driver
1058 .await_subscribe(GlobalId::User(id), Timestamp::from(up_to), timeout)
1059 .await?;
1060 Ok(render_updates(&updates))
1061 }
1062 Command::CreateInstance {
1063 expiration_offset,
1064 arrangement_dictionary_compression,
1065 } => {
1066 let expiration_offset = expiration_offset
1067 .as_deref()
1068 .map(|s| {
1069 <Duration as ConfigType>::parse(s).map_err(|e| {
1070 anyhow::anyhow!("expiration-offset {s:?} is not a duration: {e}")
1071 })
1072 })
1073 .transpose()?;
1074 self.driver
1075 .create_instance(expiration_offset, arrangement_dictionary_compression)?;
1076 Ok("ok".to_string())
1077 }
1078 Command::UpdateConfiguration { updates } => {
1079 let mut dyncfg_updates = ConfigUpdates::default();
1080 for setting in &updates {
1081 dyncfg_updates.add_dynamic(
1082 &setting.name,
1083 parse_config_val(&setting.ty, &setting.value)?,
1084 );
1085 }
1086 self.driver.update_configuration(dyncfg_updates)?;
1087 Ok("ok".to_string())
1088 }
1089 Command::Reconnect => {
1090 self.driver.reconnect().await?;
1091 Ok("ok".to_string())
1092 }
1093 Command::InitializationComplete => {
1094 self.driver.send(ComputeCommand::InitializationComplete)?;
1095 Ok("ok".to_string())
1096 }
1097 }
1098 }
1099}
1100
1101/// Run a script: parse `content` into stanzas, execute each command, and either
1102/// compare its output to the stanza's expected block or — when `REWRITE` is set
1103/// and `path` is given — rewrite the file in place with the actual outputs.
1104///
1105/// Returns `Err` if any stanza's output differs from its expected block, so a
1106/// scripted run exits non-zero on a mismatch (and CI fails). A command that fails
1107/// renders as `error: <message>`, so an expected failure is asserted by its
1108/// golden block rather than a special command.
1109pub async fn run(
1110 driver: Driver,
1111 loc: PersistLocation,
1112 content: &str,
1113 path: Option<&Path>,
1114) -> anyhow::Result<()> {
1115 let items = crate::text::parse_file(content)?;
1116 let mut state = ScriptState::new(driver, loc).await?;
1117 let rewrite = std::env::var_os("REWRITE").is_some();
1118
1119 let mut actuals = Vec::new();
1120 let mut mismatches = 0usize;
1121 for item in &items {
1122 let crate::text::Item::Stanza(stanza) = item else {
1123 continue;
1124 };
1125 let actual = match state.execute(stanza.command.clone()).await {
1126 Ok(output) => output,
1127 Err(e) => format!("error: {e}"),
1128 };
1129 let directive = stanza.input.lines().next().unwrap_or_default();
1130 if rewrite {
1131 println!("{directive} => {actual}");
1132 } else if actual == stanza.expected {
1133 println!("ok: {directive}");
1134 } else {
1135 mismatches += 1;
1136 println!(
1137 "MISMATCH: {directive}\n expected: {:?}\n actual: {:?}",
1138 stanza.expected, actual
1139 );
1140 }
1141 actuals.push(actual);
1142 }
1143
1144 if rewrite {
1145 let path = path.context("REWRITE is set but the script came from stdin")?;
1146 std::fs::write(path, crate::text::rewrite(&items, &actuals))
1147 .with_context(|| format!("rewriting {}", path.display()))?;
1148 return Ok(());
1149 }
1150 if mismatches > 0 {
1151 anyhow::bail!("{mismatches} stanza(s) did not match their expected output");
1152 }
1153 Ok(())
1154}
1155
1156/// Render peeked rows as deterministic golden text: each row's datums joined by
1157/// spaces, with the rows sorted so the output is independent of arrangement order.
1158fn render_rows(rows: &[Row]) -> String {
1159 let mut lines: Vec<String> = rows
1160 .iter()
1161 .map(|row| {
1162 row.unpack()
1163 .iter()
1164 .map(|datum| datum.to_string())
1165 .collect::<Vec<_>>()
1166 .join(" ")
1167 })
1168 .collect();
1169 lines.sort();
1170 lines.join("\n")
1171}
1172
1173/// Convert an optional `up_to` timestamp into a sink's exclusive upper antichain;
1174/// `None` is the empty antichain (no bound — the sink runs indefinitely).
1175fn up_to_antichain(up_to: Option<u64>) -> Antichain<Timestamp> {
1176 match up_to {
1177 Some(t) => Antichain::from_elem(Timestamp::from(t)),
1178 None => Antichain::new(),
1179 }
1180}
1181
1182/// Render a subscribe's updates as golden text: `<ts> <diff> <datums>` per line.
1183/// Updates are consolidated by `(time, row)` — so split batches and retractions
1184/// collapse — net-zero rows dropped, and the lines sorted for determinism.
1185fn render_updates(updates: &[(Row, Timestamp, i64)]) -> String {
1186 let mut by_key: BTreeMap<(Timestamp, Row), i64> = BTreeMap::new();
1187 for (row, ts, diff) in updates {
1188 *by_key.entry((*ts, row.clone())).or_default() += diff;
1189 }
1190 let mut lines: Vec<String> = by_key
1191 .into_iter()
1192 .filter(|(_, diff)| *diff != 0)
1193 .map(|((ts, row), diff)| {
1194 let datums = row
1195 .unpack()
1196 .iter()
1197 .map(|datum| datum.to_string())
1198 .collect::<Vec<_>>()
1199 .join(" ");
1200 format!("{ts} {diff} {datums}")
1201 })
1202 .collect();
1203 lines.sort();
1204 lines.join("\n")
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use super::*;
1210
1211 /// `define_schema` types map to a `RelationDesc` with matching arity and
1212 /// nullability, and `synth_rows` fills it.
1213 #[mz_ore::test]
1214 fn schema_parse_and_synth() {
1215 let columns = vec![
1216 ColumnSpec {
1217 name: "k".to_string(),
1218 ty: "bigint".to_string(),
1219 nullable: false,
1220 },
1221 ColumnSpec {
1222 name: "flag".to_string(),
1223 ty: "boolean".to_string(),
1224 nullable: false,
1225 },
1226 ColumnSpec {
1227 name: "v".to_string(),
1228 ty: "text".to_string(),
1229 nullable: true,
1230 },
1231 ];
1232 let desc = relation_desc(&columns).unwrap();
1233 assert_eq!(desc.arity(), 3);
1234 let types: Vec<_> = desc.iter_types().collect();
1235 assert_eq!(types[0].scalar_type, SqlScalarType::Int64);
1236 assert_eq!(types[1].scalar_type, SqlScalarType::Bool);
1237 assert!(types[2].nullable);
1238
1239 let rows = synth_rows(&desc, 0, 4, 8);
1240 assert_eq!(rows.len(), 4);
1241
1242 assert!(scalar_type_from_str("nope").is_err());
1243 }
1244
1245 /// Tokens type into the right `Cell`s against their column, bare `null` is SQL
1246 /// null (rejected for a non-nullable column), and a bad numeric token errors.
1247 #[mz_ore::test]
1248 fn cell_from_token_maps_values() {
1249 let int_col = SqlColumnType {
1250 scalar_type: SqlScalarType::Int64,
1251 nullable: false,
1252 };
1253 let str_col = SqlColumnType {
1254 scalar_type: SqlScalarType::String,
1255 nullable: true,
1256 };
1257 assert_eq!(cell_from_token("7", &int_col).unwrap(), Cell::Int64(7));
1258 assert_eq!(
1259 cell_from_token("hi", &str_col).unwrap(),
1260 Cell::Str("hi".to_string())
1261 );
1262 // A quoted string keeps its contents; the quotes are stripped.
1263 assert_eq!(
1264 cell_from_token("\"a b\"", &str_col).unwrap(),
1265 Cell::Str("a b".to_string())
1266 );
1267 assert_eq!(cell_from_token("null", &str_col).unwrap(), Cell::Null);
1268 // null into a non-nullable column is an error.
1269 assert!(cell_from_token("null", &int_col).is_err());
1270 // a non-numeric token in an int column is an error.
1271 assert!(cell_from_token("x", &int_col).is_err());
1272 }
1273}