misc.python.materialize.mzcompose.test_result
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10from __future__ import annotations 11 12import re 13from dataclasses import dataclass 14 15from materialize import MZ_ROOT 16from materialize.ui import CommandFailureCausedUIError, UIError 17from materialize.util import filter_cmd 18 19PEM_CONTENT_RE = r"-----BEGIN ([A-Z ]+)-----[^-]+-----END [A-Z ]+-----" 20PEM_CONTENT_REPLACEMENT = r"<\1>" 21 22 23@dataclass 24class TestResult: 25 __test__ = False 26 27 duration: float 28 errors: list[TestFailureDetails] 29 30 def is_successful(self) -> bool: 31 return len(self.errors) == 0 32 33 34@dataclass 35class TestFailureDetails: 36 __test__ = False 37 38 message: str 39 details: str | None 40 additional_details_header: str | None = None 41 additional_details: str | None = None 42 test_class_name_override: str | None = None 43 """The test class usually describes the framework.""" 44 test_case_name_override: str | None = None 45 """The test case usually describes the workflow, unless more fine-grained information is available.""" 46 location: str | None = None 47 """depending on the check, this may either be a file name or a path""" 48 line_number: int | None = None 49 50 def location_as_file_name(self) -> str | None: 51 if self.location is None: 52 return None 53 54 if "/" in self.location: 55 return self.location[self.location.rindex("/") + 1 :] 56 57 return self.location 58 59 60class FailedTestExecutionError(UIError): 61 """ 62 An UIError that is caused by a failing test. 63 """ 64 65 def __init__( 66 self, 67 errors: list[TestFailureDetails], 68 error_summary: str = "At least one test failed", 69 ): 70 super().__init__(error_summary) 71 self.errors = errors 72 73 74def try_determine_errors_from_cmd_execution( 75 e: CommandFailureCausedUIError, test_context: str | None 76) -> list[TestFailureDetails]: 77 output = e.stderr or e.stdout 78 79 if "running docker compose failed" in str(e): 80 return [determine_error_from_docker_compose_failure(e, output, test_context)] 81 82 if output is None: 83 return [] 84 85 error_chunks = extract_error_chunks_from_output(output) 86 87 collected_errors = [] 88 for chunk in error_chunks: 89 match = re.search(r"([^.]+\.td):(\d+):\d+:", chunk) 90 if match is not None: 91 # for .td files like Postgres CDC, file_path will just contain the file name 92 file_path = match.group(1) 93 line_number = int(match.group(2)) 94 else: 95 # for .py files like platform checks, file_path will be a path 96 file_path = try_determine_error_location_from_cmd(e.cmd) 97 if file_path is None or ":" not in file_path: 98 line_number = None 99 else: 100 parts = file_path.split(":") 101 file_path = parts[0] 102 line_number = int(parts[1]) 103 104 message = ( 105 f"Executing {file_path if file_path is not None else 'command'} failed!" 106 ) 107 108 failure_details = TestFailureDetails( 109 message, 110 details=chunk, 111 test_case_name_override=test_context, 112 location=file_path, 113 line_number=line_number, 114 ) 115 116 if failure_details in collected_errors: 117 # do not add an identical error again 118 pass 119 else: 120 collected_errors.append(failure_details) 121 122 return collected_errors 123 124 125def determine_error_from_docker_compose_failure( 126 e: CommandFailureCausedUIError, output: str | None, test_context: str | None 127) -> TestFailureDetails: 128 command = to_sanitized_command_str(e.cmd) 129 context_prefix = f"{test_context}: " if test_context is not None else "" 130 return TestFailureDetails( 131 f"{context_prefix}Docker compose failed: {command}", 132 details=output, 133 test_case_name_override=test_context, 134 location=None, 135 line_number=None, 136 ) 137 138 139def try_determine_error_location_from_cmd(cmd: list[str]) -> str | None: 140 root_path_as_string = f"{MZ_ROOT}/" 141 for cmd_part in cmd: 142 if type(cmd_part) == str and cmd_part.startswith("--source="): 143 return cmd_part.removeprefix("--source=").replace(root_path_as_string, "") 144 145 return None 146 147 148def extract_error_chunks_from_output(output: str) -> list[str]: 149 if "+++ !!! Error Report" not in output: 150 return [] 151 152 error_output = output[: output.index("+++ !!! Error Report") - 1] 153 error_chunks = error_output.split("^^^ +++") 154 155 return [chunk.strip() for chunk in error_chunks if len(chunk.strip()) > 0] 156 157 158def to_sanitized_command_str(cmd: list[str]) -> str: 159 command_str = " ".join([str(x) for x in filter_cmd(cmd)]) 160 return re.sub(PEM_CONTENT_RE, PEM_CONTENT_REPLACEMENT, command_str)
PEM_CONTENT_RE =
'-----BEGIN ([A-Z ]+)-----[^-]+-----END [A-Z ]+-----'
PEM_CONTENT_REPLACEMENT =
'<\\1>'
@dataclass
class
TestResult:
24@dataclass 25class TestResult: 26 __test__ = False 27 28 duration: float 29 errors: list[TestFailureDetails] 30 31 def is_successful(self) -> bool: 32 return len(self.errors) == 0
TestResult( duration: float, errors: list[TestFailureDetails])
errors: list[TestFailureDetails]
@dataclass
class
TestFailureDetails:
35@dataclass 36class TestFailureDetails: 37 __test__ = False 38 39 message: str 40 details: str | None 41 additional_details_header: str | None = None 42 additional_details: str | None = None 43 test_class_name_override: str | None = None 44 """The test class usually describes the framework.""" 45 test_case_name_override: str | None = None 46 """The test case usually describes the workflow, unless more fine-grained information is available.""" 47 location: str | None = None 48 """depending on the check, this may either be a file name or a path""" 49 line_number: int | None = None 50 51 def location_as_file_name(self) -> str | None: 52 if self.location is None: 53 return None 54 55 if "/" in self.location: 56 return self.location[self.location.rindex("/") + 1 :] 57 58 return self.location
TestFailureDetails( message: str, details: str | None, additional_details_header: str | None = None, additional_details: str | None = None, test_class_name_override: str | None = None, test_case_name_override: str | None = None, location: str | None = None, line_number: int | None = None)
class
FailedTestExecutionError(materialize.ui.UIError):
61class FailedTestExecutionError(UIError): 62 """ 63 An UIError that is caused by a failing test. 64 """ 65 66 def __init__( 67 self, 68 errors: list[TestFailureDetails], 69 error_summary: str = "At least one test failed", 70 ): 71 super().__init__(error_summary) 72 self.errors = errors
An UIError that is caused by a failing test.
FailedTestExecutionError( errors: list[TestFailureDetails], error_summary: str = 'At least one test failed')
def
try_determine_errors_from_cmd_execution( e: materialize.ui.CommandFailureCausedUIError, test_context: str | None) -> list[TestFailureDetails]:
75def try_determine_errors_from_cmd_execution( 76 e: CommandFailureCausedUIError, test_context: str | None 77) -> list[TestFailureDetails]: 78 output = e.stderr or e.stdout 79 80 if "running docker compose failed" in str(e): 81 return [determine_error_from_docker_compose_failure(e, output, test_context)] 82 83 if output is None: 84 return [] 85 86 error_chunks = extract_error_chunks_from_output(output) 87 88 collected_errors = [] 89 for chunk in error_chunks: 90 match = re.search(r"([^.]+\.td):(\d+):\d+:", chunk) 91 if match is not None: 92 # for .td files like Postgres CDC, file_path will just contain the file name 93 file_path = match.group(1) 94 line_number = int(match.group(2)) 95 else: 96 # for .py files like platform checks, file_path will be a path 97 file_path = try_determine_error_location_from_cmd(e.cmd) 98 if file_path is None or ":" not in file_path: 99 line_number = None 100 else: 101 parts = file_path.split(":") 102 file_path = parts[0] 103 line_number = int(parts[1]) 104 105 message = ( 106 f"Executing {file_path if file_path is not None else 'command'} failed!" 107 ) 108 109 failure_details = TestFailureDetails( 110 message, 111 details=chunk, 112 test_case_name_override=test_context, 113 location=file_path, 114 line_number=line_number, 115 ) 116 117 if failure_details in collected_errors: 118 # do not add an identical error again 119 pass 120 else: 121 collected_errors.append(failure_details) 122 123 return collected_errors
def
determine_error_from_docker_compose_failure( e: materialize.ui.CommandFailureCausedUIError, output: str | None, test_context: str | None) -> TestFailureDetails:
126def determine_error_from_docker_compose_failure( 127 e: CommandFailureCausedUIError, output: str | None, test_context: str | None 128) -> TestFailureDetails: 129 command = to_sanitized_command_str(e.cmd) 130 context_prefix = f"{test_context}: " if test_context is not None else "" 131 return TestFailureDetails( 132 f"{context_prefix}Docker compose failed: {command}", 133 details=output, 134 test_case_name_override=test_context, 135 location=None, 136 line_number=None, 137 )
def
try_determine_error_location_from_cmd(cmd: list[str]) -> str | None:
140def try_determine_error_location_from_cmd(cmd: list[str]) -> str | None: 141 root_path_as_string = f"{MZ_ROOT}/" 142 for cmd_part in cmd: 143 if type(cmd_part) == str and cmd_part.startswith("--source="): 144 return cmd_part.removeprefix("--source=").replace(root_path_as_string, "") 145 146 return None
def
extract_error_chunks_from_output(output: str) -> list[str]:
149def extract_error_chunks_from_output(output: str) -> list[str]: 150 if "+++ !!! Error Report" not in output: 151 return [] 152 153 error_output = output[: output.index("+++ !!! Error Report") - 1] 154 error_chunks = error_output.split("^^^ +++") 155 156 return [chunk.strip() for chunk in error_chunks if len(chunk.strip()) > 0]
def
to_sanitized_command_str(cmd: list[str]) -> str: