1use crate::cli::CliError;
35use crate::cli::progress;
36use crate::client::Client;
37use crate::config::Settings;
38use crate::docker_runtime::{DockerRuntime, DockerRuntimeError};
39use crate::project::compiler::cache::ProjectCache;
40use crate::project::ir::compiled::FullyQualifiedName;
41use crate::project::ir::graph::Project;
42use crate::project::ir::object_id::ObjectId;
43use crate::project::ir::unit_test;
44use crate::project::{self};
45use crate::types::{self, Types};
46
47mod lower;
48
49use crate::{info, info_nonl, verbose};
50pub(crate) use lower::TestValidationError;
51use owo_colors::{OwoColorize, Stream, Style};
52use serde::Serialize;
53use std::collections::{BTreeMap, BTreeSet};
54use std::fmt::Write;
55use std::fs::File;
56use std::path::Path;
57use std::time::Instant;
58use time::Duration;
59
60#[derive(Serialize)]
62struct TestSummary {
63 passed: usize,
64 failed: usize,
65 validation_failed: usize,
66}
67
68struct TestFilter {
76 database: Option<String>,
77 schema: Option<String>,
78 object: Option<String>,
79 test_name: Option<String>,
80}
81
82impl TestFilter {
83 fn parse(filter: &str) -> Self {
84 let (object_part, test_name) = match filter.split_once('#') {
85 Some((left, name)) => (left, Some(name.to_string())),
86 None => (filter, None),
87 };
88
89 let segments: Vec<&str> = object_part.split('.').collect();
90 let (database, schema, object) = match segments.as_slice() {
91 [db, schema, obj] => {
92 let db = if *db == "*" {
93 None
94 } else {
95 Some(db.to_string())
96 };
97 let schema = if *schema == "*" {
98 None
99 } else {
100 Some(schema.to_string())
101 };
102 let obj = if *obj == "*" {
103 None
104 } else {
105 Some(obj.to_string())
106 };
107 (db, schema, obj)
108 }
109 [db, schema_or_star] => {
110 let db = if *db == "*" {
111 None
112 } else {
113 Some(db.to_string())
114 };
115 let schema = if *schema_or_star == "*" {
116 None
117 } else {
118 Some(schema_or_star.to_string())
119 };
120 (db, schema, None)
121 }
122 [db_or_star] => {
123 let db = if *db_or_star == "*" {
124 None
125 } else {
126 Some(db_or_star.to_string())
127 };
128 (db, None, None)
129 }
130 _ => (None, None, None),
131 };
132
133 TestFilter {
134 database,
135 schema,
136 object,
137 test_name,
138 }
139 }
140
141 fn matches(&self, object_id: &ObjectId, test: &unit_test::UnitTest) -> bool {
142 if let Some(ref db) = self.database {
143 if Some(db.as_str()) != object_id.database() {
144 return false;
145 }
146 }
147 if let Some(ref schema) = self.schema {
148 if schema != object_id.schema() {
149 return false;
150 }
151 }
152 if let Some(ref obj) = self.object {
153 if obj != object_id.object() {
154 return false;
155 }
156 }
157 if let Some(ref name) = self.test_name {
158 if name != &test.name {
159 return false;
160 }
161 }
162 true
163 }
164}
165
166#[derive(Serialize)]
171enum TestOutcome {
172 Passed,
173 Failed(ExecutionFailure),
174 ValidationFailed(ValidationFailure),
175}
176
177#[derive(Serialize)]
179enum ValidationFailure {
180 UnitTest(TestValidationError),
181 AtTime(lower::InvalidAtTimeError),
182}
183
184#[derive(Serialize)]
185enum ExecutionFailure {
186 Error(String),
188 AssertionFailed {
191 columns: Vec<String>,
192 missing: Vec<BTreeMap<String, String>>,
193 unexpected: Vec<BTreeMap<String, String>>,
194 },
195}
196
197#[derive(Serialize)]
201struct TestResultEntry {
202 name: String,
203 object_id: ObjectId,
204 status: TestOutcome,
205 elapsed_ms: u64,
206}
207
208impl TestResultEntry {
209 fn new(
210 name: &str,
211 object_id: &ObjectId,
212 elapsed: Duration,
213 status: TestOutcome,
214 ) -> TestResultEntry {
215 let elapsed_ms = u64::try_from(elapsed.whole_milliseconds().max(0)).unwrap_or(0);
216 Self {
217 name: name.to_string(),
218 object_id: object_id.clone(),
219 status,
220 elapsed_ms,
221 }
222 }
223
224 fn to_junit_test_case(&self) -> junit_report::TestCase {
227 let elapsed = Duration::milliseconds(i64::try_from(self.elapsed_ms).unwrap_or(i64::MAX));
228 let mut test_case = match &self.status {
229 TestOutcome::Passed => junit_report::TestCase::success(&self.name, elapsed),
230 TestOutcome::Failed(ExecutionFailure::AssertionFailed {
231 columns,
232 missing,
233 unexpected,
234 }) => {
235 let msg = format_assertion_rows_for_junit(columns, missing, unexpected);
236 junit_report::TestCase::failure(&self.name, elapsed, "assertion", &msg)
237 }
238 TestOutcome::Failed(ExecutionFailure::Error(message)) => {
239 junit_report::TestCase::failure(
240 &self.name,
241 elapsed,
242 "assertion",
243 &message.replace('\n', " "),
244 )
245 }
246 TestOutcome::ValidationFailed(failure) => {
247 let message = match failure {
248 ValidationFailure::UnitTest(e) => e.to_string(),
249 ValidationFailure::AtTime(e) => e.to_string(),
250 };
251 junit_report::TestCase::failure(
252 &self.name,
253 elapsed,
254 "validation",
255 &message.replace('\n', " "),
256 )
257 }
258 };
259 test_case.set_classname(&self.object_id.to_string());
260 test_case.set_filepath(&format!(
261 "models/{}/{}/{}.sql",
262 self.object_id.expect_database(),
263 self.object_id.schema(),
264 self.object_id.object()
265 ));
266 test_case
267 }
268}
269
270#[derive(Serialize)]
274struct TestResults {
275 results: Vec<TestResultEntry>,
276 summary: TestSummary,
277}
278
279impl TestResults {
280 fn to_junit_report(&self) -> junit_report::Report {
283 let mut suites: BTreeMap<String, junit_report::TestSuite> = BTreeMap::new();
284 for entry in &self.results {
285 suites
286 .entry(entry.object_id.to_string())
287 .or_insert_with(|| junit_report::TestSuite::new(&entry.object_id.to_string()))
288 .add_testcase(entry.to_junit_test_case());
289 }
290 junit_report::ReportBuilder::new()
291 .add_testsuites(suites.into_values())
292 .build()
293 }
294}
295
296enum TestTarget<'a> {
344 Docker(&'a DockerRuntime),
345 Profile(&'a Settings),
346}
347
348pub async fn run(
349 settings: &Settings,
350 filter: Option<&str>,
351 junit_xml: Option<&Path>,
352 overlay: Option<&Path>,
353 no_docker: bool,
354) -> Result<(), CliError> {
355 let results = run_tests(settings, filter, overlay, no_docker).await?;
356 let test_results = match results {
357 Some(results) => results,
358 None => {
359 progress::success("No tests found");
360 return Ok(());
361 }
362 };
363
364 for entry in &test_results.results {
365 print_test_outcome(&entry.name, &entry.status);
366 }
367
368 print_summary(&test_results.summary);
369
370 if let Some(path) = junit_xml {
371 let report = test_results.to_junit_report();
372 let mut file = File::create(path)
373 .map_err(|e| CliError::Message(format!("failed to create JUnit XML file: {}", e)))?;
374 report
375 .write_xml(&mut file)
376 .map_err(|e| CliError::Message(format!("failed to write JUnit XML report: {}", e)))?;
377 }
378
379 let summary = &test_results.summary;
380 let total_failed = summary.failed + summary.validation_failed;
381 if total_failed > 0 {
382 return Err(CliError::TestsFailed {
383 failed: total_failed,
384 passed: summary.passed,
385 });
386 }
387
388 Ok(())
389}
390
391async fn run_tests(
392 settings: &Settings,
393 filter: Option<&str>,
394 overlay: Option<&Path>,
395 no_docker: bool,
396) -> Result<Option<TestResults>, CliError> {
397 let directory = &settings.directory;
398 let fs = match overlay {
399 Some(p) => crate::fs::FileSystem::from_overlay_file(p).map_err(|e| {
400 CliError::Message(format!("failed to load overlay {}: {}", p.display(), e))
401 })?,
402 None => crate::fs::FileSystem::new(),
403 };
404 let planned_project = project::plan(
405 directory.clone(),
406 settings.profile_name.clone(),
407 settings.profile_suffix().map(|s| s.to_owned()),
408 settings.variables().clone(),
409 fs,
410 )
411 .await?;
412 let empty_types = Types::default();
413 let runtime = DockerRuntime::new().with_image(&settings.docker_image);
414 let test_filter = filter.map(TestFilter::parse);
419
420 if planned_project.tests.is_empty() {
421 return Ok(None);
422 }
423
424 let types_lock = types::load_types_lock(directory).unwrap_or_default();
425 let types_cache = load_or_generate_types_cache(settings, &planned_project)?;
426
427 let mut test_entries: Vec<TestResultEntry> = Vec::new();
428 let (mut passed_tests, mut failed_tests, mut validation_failed) = (0, 0, 0);
429 for (object_id, test) in &planned_project.tests {
430 if let Some(ref f) = test_filter {
431 if !f.matches(object_id, test) {
432 continue;
433 }
434 }
435 let target = if no_docker {
436 TestTarget::Profile(settings)
437 } else {
438 TestTarget::Docker(&runtime)
439 };
440 let start_time = Instant::now();
441 let outcome = run_single_test(
442 &planned_project,
443 object_id,
444 test,
445 &types_cache,
446 &types_lock,
447 target,
448 &empty_types,
449 )
450 .await?;
451
452 let elapsed = Duration::try_from(start_time.elapsed()).unwrap_or(Duration::ZERO);
453
454 match &outcome {
455 TestOutcome::Passed => passed_tests += 1,
456 TestOutcome::Failed(_) => failed_tests += 1,
457 TestOutcome::ValidationFailed(_) => validation_failed += 1,
458 }
459
460 test_entries.push(TestResultEntry::new(
461 &test.name, object_id, elapsed, outcome,
462 ));
463 }
464
465 let total_run = passed_tests + failed_tests + validation_failed;
466 if total_run == 0 {
467 if let Some(f) = filter {
468 return Err(CliError::TestsFilterMissed { filter: f.into() });
469 }
470 return Ok(None);
471 }
472
473 Ok(Some(TestResults {
474 results: test_entries,
475 summary: TestSummary {
476 passed: passed_tests,
477 failed: failed_tests,
478 validation_failed,
479 },
480 }))
481}
482
483async fn run_single_test(
488 planned_project: &Project,
489 object_id: &ObjectId,
490 test: &unit_test::UnitTest,
491 types_cache: &Option<ProjectCache>,
492 types_lock: &Types,
493 target: TestTarget<'_>,
494 empty_types: &Types,
495) -> Result<TestOutcome, CliError> {
496 let dependencies = planned_project
497 .dependency_graph
498 .get(object_id)
499 .cloned()
500 .unwrap_or_else(BTreeSet::new);
501
502 let get_columns = |id: &ObjectId| -> Option<BTreeMap<String, types::ColumnType>> {
503 types_cache
504 .as_ref()
505 .and_then(|tc| tc.get_columns(id))
506 .or_else(|| types_lock.get_table(id).cloned())
507 };
508
509 if let Err(e) = lower::validate_unit_test(test, object_id, &get_columns, &dependencies) {
510 return Ok(TestOutcome::ValidationFailed(ValidationFailure::UnitTest(
511 e,
512 )));
513 }
514
515 let owned_client = match target {
518 TestTarget::Profile(settings) => {
519 Client::connect_with_profile(settings.connection().clone())
520 .await
521 .map_err(CliError::Connection)?
522 }
523 TestTarget::Docker(runtime) => runtime_client(runtime, empty_types).await?,
524 };
525 let client: &Client = &owned_client;
526 if let Err(e) = validate_at_time(client, test).await? {
527 return Ok(TestOutcome::ValidationFailed(ValidationFailure::AtTime(e)));
528 }
529
530 let Some(target_obj) = planned_project.find_object(object_id) else {
531 return Ok(TestOutcome::Failed(ExecutionFailure::Error(format!(
532 "target object '{}' not found in project",
533 object_id
534 ))));
535 };
536
537 let typed_fqn: FullyQualifiedName = object_id.clone().into();
538 let sql_statements = lower::lower_unit_test(test, &target_obj.typed_object.stmt, &typed_fqn)
539 .map_err(|reason| CliError::InvalidUnitTestTarget {
540 test_name: test.name.clone(),
541 object_id: object_id.to_string(),
542 reason,
543 })?;
544
545 for (i, sql) in sql_statements[..sql_statements.len() - 1]
546 .iter()
547 .enumerate()
548 {
549 verbose!("executing: {}", sql);
550 if let Err(e) = client.batch_execute(sql).await {
551 let source = if i < test.mocks.len() {
552 format!("MOCK {}", test.mocks[i].fqn)
553 } else if i == test.mocks.len() {
554 "EXPECTED".to_string()
555 } else {
556 format!("target view '{}'", test.target_view)
557 };
558 return Ok(TestOutcome::Failed(ExecutionFailure::Error(format!(
559 "failed to execute {}: {}",
560 source, e
561 ))));
562 }
563 }
564
565 let test_query = &sql_statements[sql_statements.len() - 1];
566 let outcome = match client.simple_query(test_query).await {
567 Ok(messages) => {
568 let rows: Vec<_> = messages
569 .into_iter()
570 .filter_map(|m| match m {
571 tokio_postgres::SimpleQueryMessage::Row(row) => Some(row),
572 _ => None,
573 })
574 .collect();
575 if rows.is_empty() {
576 TestOutcome::Passed
577 } else {
578 let (columns, missing, unexpected) = extract_assertion_data(&rows);
579 TestOutcome::Failed(ExecutionFailure::AssertionFailed {
580 columns,
581 missing,
582 unexpected,
583 })
584 }
585 }
586 Err(e) => TestOutcome::Failed(ExecutionFailure::Error(format!(
587 "failed to execute test query: {}",
588 e
589 ))),
590 };
591
592 Ok(outcome)
597}
598
599fn print_summary(summary: &TestSummary) {
601 let total_failed = summary.failed + summary.validation_failed;
602 info_nonl!(
603 "\n{}: ",
604 "test result".if_supports_color(Stream::Stderr, |t| t.bold())
605 );
606 if total_failed == 0 {
607 let style = Style::new().green().bold();
608 info_nonl!(
609 "{}. ",
610 "ok".if_supports_color(Stream::Stderr, |t| style.style(t))
611 );
612 } else {
613 let style = Style::new().red().bold();
614 info_nonl!(
615 "{}. ",
616 "FAILED".if_supports_color(Stream::Stderr, |t| style.style(t))
617 );
618 }
619 info_nonl!(
620 "{}; ",
621 format!("{} passed", summary.passed).if_supports_color(Stream::Stderr, |t| t.green())
622 );
623 if summary.failed > 0 {
624 info_nonl!(
625 "{}; ",
626 format!("{} failed", summary.failed).if_supports_color(Stream::Stderr, |t| t.red())
627 );
628 } else {
629 info_nonl!("{} failed; ", summary.failed);
630 }
631 if summary.validation_failed > 0 {
632 info!(
633 "{}",
634 format!("{} validation errors", summary.validation_failed)
635 .if_supports_color(Stream::Stderr, |t| t.red())
636 );
637 } else {
638 info!("{} validation errors", summary.validation_failed);
639 }
640}
641
642fn print_test_outcome(name: &str, outcome: &TestOutcome) {
644 match outcome {
645 TestOutcome::Passed => {
646 let ok_style = Style::new().green().bold();
647 info!(
648 "{} {} ... {}",
649 "test".if_supports_color(Stream::Stderr, |t| t.cyan()),
650 name.if_supports_color(Stream::Stderr, |t| t.cyan()),
651 "ok".if_supports_color(Stream::Stderr, |t| ok_style.style(t))
652 );
653 }
654 TestOutcome::ValidationFailed(failure) => {
655 let fail_style = Style::new().red().bold();
656 info!(
657 "{} {} ... {}",
658 "test".if_supports_color(Stream::Stderr, |t| t.cyan()),
659 name.if_supports_color(Stream::Stderr, |t| t.cyan()),
660 "VALIDATION FAILED".if_supports_color(Stream::Stderr, |t| fail_style.style(t))
661 );
662 match failure {
663 ValidationFailure::UnitTest(e) => print_test_validation_error(e),
664 ValidationFailure::AtTime(e) => {
665 info!("{}", e)
666 }
667 }
668 }
669 TestOutcome::Failed(failure) => {
670 let fail_style = Style::new().red().bold();
671 info!(
672 "{} {} ... {}",
673 "test".if_supports_color(Stream::Stderr, |t| t.cyan()),
674 name.if_supports_color(Stream::Stderr, |t| t.cyan()),
675 "FAILED".if_supports_color(Stream::Stderr, |t| fail_style.style(t))
676 );
677 match failure {
678 ExecutionFailure::AssertionFailed {
679 columns,
680 missing,
681 unexpected,
682 } => {
683 info_nonl!("{}", format_assertion_rows(columns, missing, unexpected))
684 }
685 ExecutionFailure::Error(msg) => {
686 let err_style = Style::new().red().bold();
687 info!(
688 " {}: {}",
689 "error".if_supports_color(Stream::Stderr, |t| err_style.style(t)),
690 msg
691 )
692 }
693 }
694 }
695 }
696}
697
698fn print_test_validation_error(error: &TestValidationError) {
700 match error {
701 TestValidationError::UnmockedDependency(inner) => {
702 info!("{}", inner)
703 }
704 TestValidationError::MockSchemaMismatch(inner) => {
705 info!("{}", inner)
706 }
707 TestValidationError::ExpectedSchemaMismatch(inner) => {
708 info!("{}", inner)
709 }
710 TestValidationError::InvalidAtTime(inner) => {
711 info!("{}", inner)
712 }
713 TestValidationError::TypesCacheUnavailable { reason } => {
714 let style = Style::new().bright_red().bold();
715 info!(
716 "{}: types cache unavailable: {}",
717 "error".if_supports_color(Stream::Stderr, |t| style.style(t)),
718 reason
719 );
720 }
721 }
722}
723
724async fn runtime_client(runtime: &DockerRuntime, _empty_types: &Types) -> Result<Client, CliError> {
728 match runtime.get_client().await {
729 Ok(client) => Ok(client),
730 Err(DockerRuntimeError::ContainerStartFailed(e)) => Err(CliError::Message(format!(
731 "Docker not available for running tests: {}",
732 e
733 ))),
734 Err(e) => Err(CliError::Message(format!(
735 "Failed to start test environment: {}",
736 e
737 ))),
738 }
739}
740
741async fn validate_at_time(
746 client: &Client,
747 test: &unit_test::UnitTest,
748) -> Result<Result<(), lower::InvalidAtTimeError>, CliError> {
749 if let Some(at_time) = &test.at_time {
750 let validation_query = format!("SELECT {}::mz_timestamp", at_time);
751 if let Err(e) = client.simple_query(&validation_query).await {
752 let error = lower::InvalidAtTimeError {
753 test_name: test.name.clone(),
754 at_time_value: at_time.clone(),
755 db_error: e.to_string(),
756 };
757 return Ok(Err(error));
758 }
759 }
760 Ok(Ok(()))
761}
762
763fn format_assertion_rows(
768 columns: &[String],
769 missing: &[BTreeMap<String, String>],
770 unexpected: &[BTreeMap<String, String>],
771) -> String {
772 let mut out = String::new();
773 let title_style = Style::new().yellow().bold();
774 writeln!(
775 out,
776 " {}:",
777 "Test assertion failed".if_supports_color(Stream::Stderr, |t| title_style.style(t))
778 )
779 .unwrap();
780 if missing.is_empty() && unexpected.is_empty() {
781 return out;
782 }
783 let header = std::iter::once("status".to_string())
784 .chain(columns.iter().cloned())
785 .collect::<Vec<_>>()
786 .join(" | ");
787 let header_style = Style::new().bold().cyan();
788 writeln!(
789 out,
790 " {}",
791 header.if_supports_color(Stream::Stderr, |t| header_style.style(t))
792 )
793 .unwrap();
794 writeln!(
795 out,
796 " {}",
797 "-".repeat(header.len())
798 .if_supports_color(Stream::Stderr, |t| t.cyan())
799 )
800 .unwrap();
801
802 let groups: [(&str, &[BTreeMap<String, String>]); 2] =
803 [("MISSING", missing), ("UNEXPECTED", unexpected)];
804 for (status, rows) in groups {
805 let status_colored = match status {
806 "MISSING" => {
807 let style = Style::new().red().bold();
808 status
809 .if_supports_color(Stream::Stderr, |t| style.style(t))
810 .to_string()
811 }
812 "UNEXPECTED" => {
813 let style = Style::new().yellow().bold();
814 status
815 .if_supports_color(Stream::Stderr, |t| style.style(t))
816 .to_string()
817 }
818 _ => status.to_string(),
819 };
820 for row in rows {
821 let mut values = vec![status_colored.clone()];
822 for col_name in columns {
823 values.push(
824 row.get(col_name)
825 .cloned()
826 .unwrap_or_else(|| "<null>".to_string()),
827 );
828 }
829 writeln!(out, " {}", values.join(" | ")).unwrap();
830 }
831 }
832 out
833}
834
835fn format_assertion_rows_for_junit(
840 columns: &[String],
841 missing: &[BTreeMap<String, String>],
842 unexpected: &[BTreeMap<String, String>],
843) -> String {
844 let groups: [(&str, &str, &[BTreeMap<String, String>]); 2] = [
845 ("MISSING", "expected but not produced", missing),
846 ("UNEXPECTED", "produced but not expected", unexpected),
847 ];
848
849 let mut out = String::new();
850 let mut first = true;
851 for (status, description, rows) in groups {
852 if rows.is_empty() {
853 continue;
854 }
855 if !first {
856 writeln!(out).unwrap();
857 }
858 first = false;
859 writeln!(out, "{} rows ({}):", status, description).unwrap();
860 for row in rows {
861 let pairs: Vec<String> = columns
862 .iter()
863 .map(|col_name| {
864 let value = row.get(col_name).map(String::as_str).unwrap_or("<null>");
865 format!("{}={}", col_name, value)
866 })
867 .collect();
868 writeln!(out, " {}", pairs.join(", ")).unwrap();
869 }
870 }
871 out
872}
873
874fn extract_assertion_data(
879 rows: &[tokio_postgres::SimpleQueryRow],
880) -> (
881 Vec<String>,
882 Vec<BTreeMap<String, String>>,
883 Vec<BTreeMap<String, String>>,
884) {
885 if rows.is_empty() {
886 return (Vec::new(), Vec::new(), Vec::new());
887 }
888
889 let columns: Vec<String> = rows[0]
890 .columns()
891 .iter()
892 .skip(1)
893 .map(|col| col.name().to_string())
894 .collect();
895
896 let mut missing = Vec::new();
897 let mut unexpected = Vec::new();
898
899 for row in rows {
900 let status = row.get(0).unwrap_or("UNKNOWN");
901 let mut map = BTreeMap::new();
902 for (i, col_name) in columns.iter().enumerate() {
903 let value = row.get(i + 1).unwrap_or("<null>").to_string();
904 map.insert(col_name.clone(), value);
905 }
906 match status {
907 "MISSING" => missing.push(map),
908 "UNEXPECTED" => unexpected.push(map),
909 _ => unexpected.push(map),
910 }
911 }
912
913 (columns, missing, unexpected)
914}
915
916fn load_or_generate_types_cache(
922 settings: &Settings,
923 planned_project: &Project,
924) -> Result<Option<ProjectCache>, CliError> {
925 use crate::project::compiler::typecheck;
926
927 let directory = &settings.directory;
928 let external_types = types::load_types_lock(directory).unwrap_or_default();
929
930 typecheck::run(
931 directory,
932 settings.profile_name().unwrap_or(""),
933 settings.profile_suffix(),
934 settings.variables(),
935 planned_project,
936 external_types,
937 )
938 .map_err(CliError::TypeCheckFailed)?;
939
940 ProjectCache::open(
941 directory,
942 settings.profile_name().unwrap_or(""),
943 settings.profile_suffix(),
944 settings.variables(),
945 )
946 .map_err(|e| CliError::Message(format!("Failed to open types cache: {}", e)))
947}