Skip to main content

mz_deploy/cli/commands/
test.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Test command — run unit tests against the database.
11//!
12//! Executes `EXECUTE UNIT TEST` statements attached to project objects
13//! against a Materialize instance spun up in Docker. The pipeline:
14//!
15//! 1. **Load** — Compile the project and optionally load/generate type metadata.
16//! 2. **Filter** — Select tests matching the user's filter pattern (supports
17//!    `database.schema.object#test_name` with wildcards at any level).
18//! 3. **Connect** — Establish a database connection to the target environment.
19//! 4. **Execute** — For each test: locate the target view, desugar the test
20//!    into SQL, execute setup statements, run the assertion query, and classify
21//!    the result.
22//! 5. **Report** — Print pass/fail output and, when requested, produce a
23//!    JUnit XML report.
24//!
25//! ## Outcome Classification
26//!
27//! - **Passed** — Assertion query returned zero rows (no mismatches).
28//! - **Failed** — Assertion query returned rows (missing or unexpected data)
29//!   or a runtime error occurred during execution.
30//! - **ValidationFailed** — Test definition is invalid (bad target, malformed
31//!   `AT TIME`, etc.). Tracked separately so summary output distinguishes
32//!   broken definitions from runtime failures.
33
34use crate::cli::CliError;
35use crate::cli::progress;
36use crate::client::Client;
37use crate::config::Settings;
38use crate::docker_runtime::{DockerRuntime, DockerRuntimeError};
39use crate::project::compiler::cache::ProjectCache;
40use crate::project::ir::compiled::FullyQualifiedName;
41use crate::project::ir::graph::Project;
42use crate::project::ir::object_id::ObjectId;
43use crate::project::ir::unit_test;
44use crate::project::{self};
45use crate::types::{self, Types};
46
47mod lower;
48
49use crate::{info, info_nonl, verbose};
50pub(crate) use lower::TestValidationError;
51use owo_colors::{OwoColorize, Stream, Style};
52use serde::Serialize;
53use std::collections::{BTreeMap, BTreeSet};
54use std::fmt::Write;
55use std::fs::File;
56use std::path::Path;
57use std::time::Instant;
58use time::Duration;
59
60/// Aggregate pass/fail counts for the test run.
61#[derive(Serialize)]
62struct TestSummary {
63    passed: usize,
64    failed: usize,
65    validation_failed: usize,
66}
67
68/// Filter to select a subset of tests to run.
69///
70/// Supports patterns like:
71/// - `database.*` — all tests in a database
72/// - `database.schema.*` — all tests in a schema
73/// - `database.schema.object` — all tests for an object
74/// - `database.schema.object#test_name` — a single named test
75struct TestFilter {
76    database: Option<String>,
77    schema: Option<String>,
78    object: Option<String>,
79    test_name: Option<String>,
80}
81
82impl TestFilter {
83    fn parse(filter: &str) -> Self {
84        let (object_part, test_name) = match filter.split_once('#') {
85            Some((left, name)) => (left, Some(name.to_string())),
86            None => (filter, None),
87        };
88
89        let segments: Vec<&str> = object_part.split('.').collect();
90        let (database, schema, object) = match segments.as_slice() {
91            [db, schema, obj] => {
92                let db = if *db == "*" {
93                    None
94                } else {
95                    Some(db.to_string())
96                };
97                let schema = if *schema == "*" {
98                    None
99                } else {
100                    Some(schema.to_string())
101                };
102                let obj = if *obj == "*" {
103                    None
104                } else {
105                    Some(obj.to_string())
106                };
107                (db, schema, obj)
108            }
109            [db, schema_or_star] => {
110                let db = if *db == "*" {
111                    None
112                } else {
113                    Some(db.to_string())
114                };
115                let schema = if *schema_or_star == "*" {
116                    None
117                } else {
118                    Some(schema_or_star.to_string())
119                };
120                (db, schema, None)
121            }
122            [db_or_star] => {
123                let db = if *db_or_star == "*" {
124                    None
125                } else {
126                    Some(db_or_star.to_string())
127                };
128                (db, None, None)
129            }
130            _ => (None, None, None),
131        };
132
133        TestFilter {
134            database,
135            schema,
136            object,
137            test_name,
138        }
139    }
140
141    fn matches(&self, object_id: &ObjectId, test: &unit_test::UnitTest) -> bool {
142        if let Some(ref db) = self.database {
143            if Some(db.as_str()) != object_id.database() {
144                return false;
145            }
146        }
147        if let Some(ref schema) = self.schema {
148            if schema != object_id.schema() {
149                return false;
150            }
151        }
152        if let Some(ref obj) = self.object {
153            if obj != object_id.object() {
154                return false;
155            }
156        }
157        if let Some(ref name) = self.test_name {
158            if name != &test.name {
159                return false;
160            }
161        }
162        true
163    }
164}
165
166/// Final classification for a single test invocation.
167///
168/// `ValidationFailed` is tracked separately from execution failures so summary output
169/// can distinguish broken test definitions from assertion/runtime failures.
170#[derive(Serialize)]
171enum TestOutcome {
172    Passed,
173    Failed(ExecutionFailure),
174    ValidationFailed(ValidationFailure),
175}
176
177/// Pre-execution validation failure in a test definition.
178#[derive(Serialize)]
179enum ValidationFailure {
180    UnitTest(TestValidationError),
181    AtTime(lower::InvalidAtTimeError),
182}
183
184#[derive(Serialize)]
185enum ExecutionFailure {
186    /// Setup or query execution error.
187    Error(String),
188    /// Test assertion mismatch with raw row data for structured reporting.
189    /// Display output is formatted on demand from these fields.
190    AssertionFailed {
191        columns: Vec<String>,
192        missing: Vec<BTreeMap<String, String>>,
193        unexpected: Vec<BTreeMap<String, String>>,
194    },
195}
196
197/// Per-test result captured during execution and used to build the JUnit
198/// report. Serializable so the full result set can be emitted structurally
199/// (e.g. for future machine-readable reporting).
200#[derive(Serialize)]
201struct TestResultEntry {
202    name: String,
203    object_id: ObjectId,
204    status: TestOutcome,
205    elapsed_ms: u64,
206}
207
208impl TestResultEntry {
209    fn new(
210        name: &str,
211        object_id: &ObjectId,
212        elapsed: Duration,
213        status: TestOutcome,
214    ) -> TestResultEntry {
215        let elapsed_ms = u64::try_from(elapsed.whole_milliseconds().max(0)).unwrap_or(0);
216        Self {
217            name: name.to_string(),
218            object_id: object_id.clone(),
219            status,
220            elapsed_ms,
221        }
222    }
223
224    /// Render this entry as a JUnit `TestCase`. The failure message for
225    /// assertion mismatches is regenerated from the structured row data.
226    fn to_junit_test_case(&self) -> junit_report::TestCase {
227        let elapsed = Duration::milliseconds(i64::try_from(self.elapsed_ms).unwrap_or(i64::MAX));
228        let mut test_case = match &self.status {
229            TestOutcome::Passed => junit_report::TestCase::success(&self.name, elapsed),
230            TestOutcome::Failed(ExecutionFailure::AssertionFailed {
231                columns,
232                missing,
233                unexpected,
234            }) => {
235                let msg = format_assertion_rows_for_junit(columns, missing, unexpected);
236                junit_report::TestCase::failure(&self.name, elapsed, "assertion", &msg)
237            }
238            TestOutcome::Failed(ExecutionFailure::Error(message)) => {
239                junit_report::TestCase::failure(
240                    &self.name,
241                    elapsed,
242                    "assertion",
243                    &message.replace('\n', "&#10;"),
244                )
245            }
246            TestOutcome::ValidationFailed(failure) => {
247                let message = match failure {
248                    ValidationFailure::UnitTest(e) => e.to_string(),
249                    ValidationFailure::AtTime(e) => e.to_string(),
250                };
251                junit_report::TestCase::failure(
252                    &self.name,
253                    elapsed,
254                    "validation",
255                    &message.replace('\n', "&#10;"),
256                )
257            }
258        };
259        test_case.set_classname(&self.object_id.to_string());
260        test_case.set_filepath(&format!(
261            "models/{}/{}/{}.sql",
262            self.object_id.expect_database(),
263            self.object_id.schema(),
264            self.object_id.object()
265        ));
266        test_case
267    }
268}
269
270/// Aggregated results of a test run. Acts as the canonical intermediate form
271/// from which the JUnit XML report is derived, and is kept serializable so the
272/// same shape can back a structured JSON report in the future.
273#[derive(Serialize)]
274struct TestResults {
275    results: Vec<TestResultEntry>,
276    summary: TestSummary,
277}
278
279impl TestResults {
280    /// Build a JUnit XML report by grouping entries into one `TestSuite` per
281    /// target `object_id`.
282    fn to_junit_report(&self) -> junit_report::Report {
283        let mut suites: BTreeMap<String, junit_report::TestSuite> = BTreeMap::new();
284        for entry in &self.results {
285            suites
286                .entry(entry.object_id.to_string())
287                .or_insert_with(|| junit_report::TestSuite::new(&entry.object_id.to_string()))
288                .add_testcase(entry.to_junit_test_case());
289        }
290        junit_report::ReportBuilder::new()
291            .add_testsuites(suites.into_values())
292            .build()
293    }
294}
295
296/// Run unit tests against the database.
297///
298/// Delegates test execution to `run_tests`, then owns all presentation:
299/// printing per-test outcomes, printing the summary line, and optionally
300/// writing a JUnit XML report.
301///
302/// # Test statement syntax
303///
304/// Tests are authored as SQL statements attached to the object under test:
305///
306/// ```sql
307/// EXECUTE UNIT TEST <name>
308///   FOR <target>
309///   [AT TIME <expr>]
310///   [MOCK <mock_view>(<columns>) AS <query>]*
311///   EXPECTED <expected_result>
312/// ```
313///
314/// A test passes when the derived assertion query returns no rows. Rows are
315/// returned when:
316/// - Expected rows are MISSING from the actual results
317/// - Unexpected rows appear in the actual results
318///
319/// # Arguments
320/// * `settings` - Resolved CLI settings (project directory, profile, Docker
321///   image, etc.)
322/// * `filter` - Optional `database.schema.object#test_name` pattern; `*`
323///   and omitted trailing segments act as wildcards
324/// * `junit_xml` - Optional path to write a JUnit XML report to
325///
326/// # Returns
327/// `Ok(())` if all executed tests pass (including when there were no tests
328/// to run and no filter was supplied).
329///
330/// # Errors
331/// - [`CliError::TestsFailed`] if any test failed or failed validation
332/// - [`CliError::TestsFilterMissed`] if `filter` matched no tests
333/// - Project planning, type-check, or JUnit I/O errors are propagated as
334///   [`CliError::Message`] or the corresponding variant
335/// Connection source for executing unit tests: an ephemeral Docker container
336/// (the default) or the profile's configured Materialize region (`--no-docker`).
337/// Where a single test runs. Both variants yield a *fresh* per-test connection
338/// in `run_single_test`: Docker spins up (or reuses) an ephemeral container and
339/// connects without the server-cluster pin; Profile opens a new pinned
340/// connection to the profile's region. A per-test connection keeps tests
341/// isolated — its session vars and `CREATE TEMPORARY VIEW` objects are discarded
342/// when the connection is dropped — without relying on in-session cleanup.
343enum TestTarget<'a> {
344    Docker(&'a DockerRuntime),
345    Profile(&'a Settings),
346}
347
348pub async fn run(
349    settings: &Settings,
350    filter: Option<&str>,
351    junit_xml: Option<&Path>,
352    overlay: Option<&Path>,
353    no_docker: bool,
354) -> Result<(), CliError> {
355    let results = run_tests(settings, filter, overlay, no_docker).await?;
356    let test_results = match results {
357        Some(results) => results,
358        None => {
359            progress::success("No tests found");
360            return Ok(());
361        }
362    };
363
364    for entry in &test_results.results {
365        print_test_outcome(&entry.name, &entry.status);
366    }
367
368    print_summary(&test_results.summary);
369
370    if let Some(path) = junit_xml {
371        let report = test_results.to_junit_report();
372        let mut file = File::create(path)
373            .map_err(|e| CliError::Message(format!("failed to create JUnit XML file: {}", e)))?;
374        report
375            .write_xml(&mut file)
376            .map_err(|e| CliError::Message(format!("failed to write JUnit XML report: {}", e)))?;
377    }
378
379    let summary = &test_results.summary;
380    let total_failed = summary.failed + summary.validation_failed;
381    if total_failed > 0 {
382        return Err(CliError::TestsFailed {
383            failed: total_failed,
384            passed: summary.passed,
385        });
386    }
387
388    Ok(())
389}
390
391async fn run_tests(
392    settings: &Settings,
393    filter: Option<&str>,
394    overlay: Option<&Path>,
395    no_docker: bool,
396) -> Result<Option<TestResults>, CliError> {
397    let directory = &settings.directory;
398    let fs = match overlay {
399        Some(p) => crate::fs::FileSystem::from_overlay_file(p).map_err(|e| {
400            CliError::Message(format!("failed to load overlay {}: {}", p.display(), e))
401        })?,
402        None => crate::fs::FileSystem::new(),
403    };
404    let planned_project = project::plan(
405        directory.clone(),
406        settings.profile_name.clone(),
407        settings.profile_suffix().map(|s| s.to_owned()),
408        settings.variables().clone(),
409        fs,
410    )
411    .await?;
412    let empty_types = Types::default();
413    let runtime = DockerRuntime::new().with_image(&settings.docker_image);
414    // With `--no-docker`, each test runs against the profile's region instead of
415    // an ephemeral container. Like the Docker path, every test gets its own
416    // connection (see `TestTarget`), so a test's temporary objects and any
417    // session-var changes can't leak into the next test.
418    let test_filter = filter.map(TestFilter::parse);
419
420    if planned_project.tests.is_empty() {
421        return Ok(None);
422    }
423
424    let types_lock = types::load_types_lock(directory).unwrap_or_default();
425    let types_cache = load_or_generate_types_cache(settings, &planned_project)?;
426
427    let mut test_entries: Vec<TestResultEntry> = Vec::new();
428    let (mut passed_tests, mut failed_tests, mut validation_failed) = (0, 0, 0);
429    for (object_id, test) in &planned_project.tests {
430        if let Some(ref f) = test_filter {
431            if !f.matches(object_id, test) {
432                continue;
433            }
434        }
435        let target = if no_docker {
436            TestTarget::Profile(settings)
437        } else {
438            TestTarget::Docker(&runtime)
439        };
440        let start_time = Instant::now();
441        let outcome = run_single_test(
442            &planned_project,
443            object_id,
444            test,
445            &types_cache,
446            &types_lock,
447            target,
448            &empty_types,
449        )
450        .await?;
451
452        let elapsed = Duration::try_from(start_time.elapsed()).unwrap_or(Duration::ZERO);
453
454        match &outcome {
455            TestOutcome::Passed => passed_tests += 1,
456            TestOutcome::Failed(_) => failed_tests += 1,
457            TestOutcome::ValidationFailed(_) => validation_failed += 1,
458        }
459
460        test_entries.push(TestResultEntry::new(
461            &test.name, object_id, elapsed, outcome,
462        ));
463    }
464
465    let total_run = passed_tests + failed_tests + validation_failed;
466    if total_run == 0 {
467        if let Some(f) = filter {
468            return Err(CliError::TestsFilterMissed { filter: f.into() });
469        }
470        return Ok(None);
471    }
472
473    Ok(Some(TestResults {
474        results: test_entries,
475        summary: TestSummary {
476            passed: passed_tests,
477            failed: failed_tests,
478            validation_failed,
479        },
480    }))
481}
482
483/// Executes one test case through validation, setup SQL, assertion query, and cleanup.
484///
485/// Returns a `TestOutcome` without performing any terminal output so the caller
486/// can own all presentation (printing, JUnit building, counting).
487async fn run_single_test(
488    planned_project: &Project,
489    object_id: &ObjectId,
490    test: &unit_test::UnitTest,
491    types_cache: &Option<ProjectCache>,
492    types_lock: &Types,
493    target: TestTarget<'_>,
494    empty_types: &Types,
495) -> Result<TestOutcome, CliError> {
496    let dependencies = planned_project
497        .dependency_graph
498        .get(object_id)
499        .cloned()
500        .unwrap_or_else(BTreeSet::new);
501
502    let get_columns = |id: &ObjectId| -> Option<BTreeMap<String, types::ColumnType>> {
503        types_cache
504            .as_ref()
505            .and_then(|tc| tc.get_columns(id))
506            .or_else(|| types_lock.get_table(id).cloned())
507    };
508
509    if let Err(e) = lower::validate_unit_test(test, object_id, &get_columns, &dependencies) {
510        return Ok(TestOutcome::ValidationFailed(ValidationFailure::UnitTest(
511            e,
512        )));
513    }
514
515    // Each test gets its own connection so its temporary objects and session
516    // state die with the connection, keeping tests isolated.
517    let owned_client = match target {
518        TestTarget::Profile(settings) => {
519            Client::connect_with_profile(settings.connection().clone())
520                .await
521                .map_err(CliError::Connection)?
522        }
523        TestTarget::Docker(runtime) => runtime_client(runtime, empty_types).await?,
524    };
525    let client: &Client = &owned_client;
526    if let Err(e) = validate_at_time(client, test).await? {
527        return Ok(TestOutcome::ValidationFailed(ValidationFailure::AtTime(e)));
528    }
529
530    let Some(target_obj) = planned_project.find_object(object_id) else {
531        return Ok(TestOutcome::Failed(ExecutionFailure::Error(format!(
532            "target object '{}' not found in project",
533            object_id
534        ))));
535    };
536
537    let typed_fqn: FullyQualifiedName = object_id.clone().into();
538    let sql_statements = lower::lower_unit_test(test, &target_obj.typed_object.stmt, &typed_fqn)
539        .map_err(|reason| CliError::InvalidUnitTestTarget {
540            test_name: test.name.clone(),
541            object_id: object_id.to_string(),
542            reason,
543        })?;
544
545    for (i, sql) in sql_statements[..sql_statements.len() - 1]
546        .iter()
547        .enumerate()
548    {
549        verbose!("executing: {}", sql);
550        if let Err(e) = client.batch_execute(sql).await {
551            let source = if i < test.mocks.len() {
552                format!("MOCK {}", test.mocks[i].fqn)
553            } else if i == test.mocks.len() {
554                "EXPECTED".to_string()
555            } else {
556                format!("target view '{}'", test.target_view)
557            };
558            return Ok(TestOutcome::Failed(ExecutionFailure::Error(format!(
559                "failed to execute {}: {}",
560                source, e
561            ))));
562        }
563    }
564
565    let test_query = &sql_statements[sql_statements.len() - 1];
566    let outcome = match client.simple_query(test_query).await {
567        Ok(messages) => {
568            let rows: Vec<_> = messages
569                .into_iter()
570                .filter_map(|m| match m {
571                    tokio_postgres::SimpleQueryMessage::Row(row) => Some(row),
572                    _ => None,
573                })
574                .collect();
575            if rows.is_empty() {
576                TestOutcome::Passed
577            } else {
578                let (columns, missing, unexpected) = extract_assertion_data(&rows);
579                TestOutcome::Failed(ExecutionFailure::AssertionFailed {
580                    columns,
581                    missing,
582                    unexpected,
583                })
584            }
585        }
586        Err(e) => TestOutcome::Failed(ExecutionFailure::Error(format!(
587            "failed to execute test query: {}",
588            e
589        ))),
590    };
591
592    // No explicit cleanup: `owned_client` is dropped when this function returns,
593    // ending the session and discarding its TEMPORARY views. This holds on every
594    // exit path (including the early returns above on setup failure), so a broken
595    // test can't leak fixed-name temp views into the next one.
596    Ok(outcome)
597}
598
599/// Prints the test summary line showing pass/fail counts.
600fn print_summary(summary: &TestSummary) {
601    let total_failed = summary.failed + summary.validation_failed;
602    info_nonl!(
603        "\n{}: ",
604        "test result".if_supports_color(Stream::Stderr, |t| t.bold())
605    );
606    if total_failed == 0 {
607        let style = Style::new().green().bold();
608        info_nonl!(
609            "{}. ",
610            "ok".if_supports_color(Stream::Stderr, |t| style.style(t))
611        );
612    } else {
613        let style = Style::new().red().bold();
614        info_nonl!(
615            "{}. ",
616            "FAILED".if_supports_color(Stream::Stderr, |t| style.style(t))
617        );
618    }
619    info_nonl!(
620        "{}; ",
621        format!("{} passed", summary.passed).if_supports_color(Stream::Stderr, |t| t.green())
622    );
623    if summary.failed > 0 {
624        info_nonl!(
625            "{}; ",
626            format!("{} failed", summary.failed).if_supports_color(Stream::Stderr, |t| t.red())
627        );
628    } else {
629        info_nonl!("{} failed; ", summary.failed);
630    }
631    if summary.validation_failed > 0 {
632        info!(
633            "{}",
634            format!("{} validation errors", summary.validation_failed)
635                .if_supports_color(Stream::Stderr, |t| t.red())
636        );
637    } else {
638        info!("{} validation errors", summary.validation_failed);
639    }
640}
641
642/// Prints the complete status line and any detail output for a single test outcome.
643fn print_test_outcome(name: &str, outcome: &TestOutcome) {
644    match outcome {
645        TestOutcome::Passed => {
646            let ok_style = Style::new().green().bold();
647            info!(
648                "{} {} ... {}",
649                "test".if_supports_color(Stream::Stderr, |t| t.cyan()),
650                name.if_supports_color(Stream::Stderr, |t| t.cyan()),
651                "ok".if_supports_color(Stream::Stderr, |t| ok_style.style(t))
652            );
653        }
654        TestOutcome::ValidationFailed(failure) => {
655            let fail_style = Style::new().red().bold();
656            info!(
657                "{} {} ... {}",
658                "test".if_supports_color(Stream::Stderr, |t| t.cyan()),
659                name.if_supports_color(Stream::Stderr, |t| t.cyan()),
660                "VALIDATION FAILED".if_supports_color(Stream::Stderr, |t| fail_style.style(t))
661            );
662            match failure {
663                ValidationFailure::UnitTest(e) => print_test_validation_error(e),
664                ValidationFailure::AtTime(e) => {
665                    info!("{}", e)
666                }
667            }
668        }
669        TestOutcome::Failed(failure) => {
670            let fail_style = Style::new().red().bold();
671            info!(
672                "{} {} ... {}",
673                "test".if_supports_color(Stream::Stderr, |t| t.cyan()),
674                name.if_supports_color(Stream::Stderr, |t| t.cyan()),
675                "FAILED".if_supports_color(Stream::Stderr, |t| fail_style.style(t))
676            );
677            match failure {
678                ExecutionFailure::AssertionFailed {
679                    columns,
680                    missing,
681                    unexpected,
682                } => {
683                    info_nonl!("{}", format_assertion_rows(columns, missing, unexpected))
684                }
685                ExecutionFailure::Error(msg) => {
686                    let err_style = Style::new().red().bold();
687                    info!(
688                        "  {}: {}",
689                        "error".if_supports_color(Stream::Stderr, |t| err_style.style(t)),
690                        msg
691                    )
692                }
693            }
694        }
695    }
696}
697
698/// Renders validation failures in the standard test output format.
699fn print_test_validation_error(error: &TestValidationError) {
700    match error {
701        TestValidationError::UnmockedDependency(inner) => {
702            info!("{}", inner)
703        }
704        TestValidationError::MockSchemaMismatch(inner) => {
705            info!("{}", inner)
706        }
707        TestValidationError::ExpectedSchemaMismatch(inner) => {
708            info!("{}", inner)
709        }
710        TestValidationError::InvalidAtTime(inner) => {
711            info!("{}", inner)
712        }
713        TestValidationError::TypesCacheUnavailable { reason } => {
714            let style = Style::new().bright_red().bold();
715            info!(
716                "{}: types cache unavailable: {}",
717                "error".if_supports_color(Stream::Stderr, |t| style.style(t)),
718                reason
719            );
720        }
721    }
722}
723
724/// Creates an isolated runtime client for test execution.
725///
726/// Converts runtime startup failures into user-facing CLI messages with actionable wording.
727async fn runtime_client(runtime: &DockerRuntime, _empty_types: &Types) -> Result<Client, CliError> {
728    match runtime.get_client().await {
729        Ok(client) => Ok(client),
730        Err(DockerRuntimeError::ContainerStartFailed(e)) => Err(CliError::Message(format!(
731            "Docker not available for running tests: {}",
732            e
733        ))),
734        Err(e) => Err(CliError::Message(format!(
735            "Failed to start test environment: {}",
736            e
737        ))),
738    }
739}
740
741/// Pre-validates optional `AT TIME` test expressions against `mz_timestamp` casting.
742///
743/// Returns `Ok(Ok(()))` when valid, `Ok(Err(InvalidAtTimeError))` on
744/// validation failure, and `Err(CliError)` on connection errors.
745async fn validate_at_time(
746    client: &Client,
747    test: &unit_test::UnitTest,
748) -> Result<Result<(), lower::InvalidAtTimeError>, CliError> {
749    if let Some(at_time) = &test.at_time {
750        let validation_query = format!("SELECT {}::mz_timestamp", at_time);
751        if let Err(e) = client.simple_query(&validation_query).await {
752            let error = lower::InvalidAtTimeError {
753                test_name: test.name.clone(),
754                at_time_value: at_time.clone(),
755                db_error: e.to_string(),
756            };
757            return Ok(Err(error));
758        }
759    }
760    Ok(Ok(()))
761}
762
763/// Formats failing assertion rows into a readable, ANSI-colored table for
764/// terminal output.
765///
766/// Display-only; does not affect pass/fail decisions.
767fn format_assertion_rows(
768    columns: &[String],
769    missing: &[BTreeMap<String, String>],
770    unexpected: &[BTreeMap<String, String>],
771) -> String {
772    let mut out = String::new();
773    let title_style = Style::new().yellow().bold();
774    writeln!(
775        out,
776        "  {}:",
777        "Test assertion failed".if_supports_color(Stream::Stderr, |t| title_style.style(t))
778    )
779    .unwrap();
780    if missing.is_empty() && unexpected.is_empty() {
781        return out;
782    }
783    let header = std::iter::once("status".to_string())
784        .chain(columns.iter().cloned())
785        .collect::<Vec<_>>()
786        .join(" | ");
787    let header_style = Style::new().bold().cyan();
788    writeln!(
789        out,
790        "  {}",
791        header.if_supports_color(Stream::Stderr, |t| header_style.style(t))
792    )
793    .unwrap();
794    writeln!(
795        out,
796        "  {}",
797        "-".repeat(header.len())
798            .if_supports_color(Stream::Stderr, |t| t.cyan())
799    )
800    .unwrap();
801
802    let groups: [(&str, &[BTreeMap<String, String>]); 2] =
803        [("MISSING", missing), ("UNEXPECTED", unexpected)];
804    for (status, rows) in groups {
805        let status_colored = match status {
806            "MISSING" => {
807                let style = Style::new().red().bold();
808                status
809                    .if_supports_color(Stream::Stderr, |t| style.style(t))
810                    .to_string()
811            }
812            "UNEXPECTED" => {
813                let style = Style::new().yellow().bold();
814                status
815                    .if_supports_color(Stream::Stderr, |t| style.style(t))
816                    .to_string()
817            }
818            _ => status.to_string(),
819        };
820        for row in rows {
821            let mut values = vec![status_colored.clone()];
822            for col_name in columns {
823                values.push(
824                    row.get(col_name)
825                        .cloned()
826                        .unwrap_or_else(|| "<null>".to_string()),
827                );
828            }
829            writeln!(out, "  {}", values.join(" | ")).unwrap();
830        }
831    }
832    out
833}
834
835/// Formats structured assertion data into plain-text for JUnit XML output.
836///
837/// Renders each row as `column_name=value` pairs, grouped by status
838/// (MISSING / UNEXPECTED), making failures easy to diagnose from CI reports.
839fn format_assertion_rows_for_junit(
840    columns: &[String],
841    missing: &[BTreeMap<String, String>],
842    unexpected: &[BTreeMap<String, String>],
843) -> String {
844    let groups: [(&str, &str, &[BTreeMap<String, String>]); 2] = [
845        ("MISSING", "expected but not produced", missing),
846        ("UNEXPECTED", "produced but not expected", unexpected),
847    ];
848
849    let mut out = String::new();
850    let mut first = true;
851    for (status, description, rows) in groups {
852        if rows.is_empty() {
853            continue;
854        }
855        if !first {
856            writeln!(out).unwrap();
857        }
858        first = false;
859        writeln!(out, "{} rows ({}):", status, description).unwrap();
860        for row in rows {
861            let pairs: Vec<String> = columns
862                .iter()
863                .map(|col_name| {
864                    let value = row.get(col_name).map(String::as_str).unwrap_or("<null>");
865                    format!("{}={}", col_name, value)
866                })
867                .collect();
868            writeln!(out, "  {}", pairs.join(", ")).unwrap();
869        }
870    }
871    out
872}
873
874/// Extracts structured assertion data from failing query rows.
875///
876/// Column 0 is the status (`MISSING` or `UNEXPECTED`); remaining columns are
877/// data columns. Returns `(column_names, missing_rows, unexpected_rows)`.
878fn extract_assertion_data(
879    rows: &[tokio_postgres::SimpleQueryRow],
880) -> (
881    Vec<String>,
882    Vec<BTreeMap<String, String>>,
883    Vec<BTreeMap<String, String>>,
884) {
885    if rows.is_empty() {
886        return (Vec::new(), Vec::new(), Vec::new());
887    }
888
889    let columns: Vec<String> = rows[0]
890        .columns()
891        .iter()
892        .skip(1)
893        .map(|col| col.name().to_string())
894        .collect();
895
896    let mut missing = Vec::new();
897    let mut unexpected = Vec::new();
898
899    for row in rows {
900        let status = row.get(0).unwrap_or("UNKNOWN");
901        let mut map = BTreeMap::new();
902        for (i, col_name) in columns.iter().enumerate() {
903            let value = row.get(i + 1).unwrap_or("<null>").to_string();
904            map.insert(col_name.clone(), value);
905        }
906        match status {
907            "MISSING" => missing.push(map),
908            "UNEXPECTED" => unexpected.push(map),
909            _ => unexpected.push(map),
910        }
911    }
912
913    (columns, missing, unexpected)
914}
915
916/// Load or generate type metadata needed for test execution.
917///
918/// Reuses previously validated schemas when possible, re-validating only
919/// objects whose definitions have changed. Returns a project cache handle
920/// for resolving column metadata during test runs.
921fn load_or_generate_types_cache(
922    settings: &Settings,
923    planned_project: &Project,
924) -> Result<Option<ProjectCache>, CliError> {
925    use crate::project::compiler::typecheck;
926
927    let directory = &settings.directory;
928    let external_types = types::load_types_lock(directory).unwrap_or_default();
929
930    typecheck::run(
931        directory,
932        settings.profile_name().unwrap_or(""),
933        settings.profile_suffix(),
934        settings.variables(),
935        planned_project,
936        external_types,
937    )
938    .map_err(CliError::TypeCheckFailed)?;
939
940    ProjectCache::open(
941        directory,
942        settings.profile_name().unwrap_or(""),
943        settings.profile_suffix(),
944        settings.variables(),
945    )
946    .map_err(|e| CliError::Message(format!("Failed to open types cache: {}", e)))
947}