Module materialize.mzcompose.test_result

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from __future__ import annotations

import re
from dataclasses import dataclass

from materialize import MZ_ROOT
from materialize.ui import CommandFailureCausedUIError, UIError

PEM_CONTENT_RE = r"-----BEGIN ([A-Z ]+)-----[^-]+-----END [A-Z ]+-----"
PEM_CONTENT_REPLACEMENT = r"<\1>"


@dataclass
class TestResult:
    duration: float
    errors: list[TestFailureDetails]

    def is_successful(self) -> bool:
        return len(self.errors) == 0


@dataclass
class TestFailureDetails:
    __test__ = False

    message: str
    details: str | None
    test_class_name_override: str | None = None
    test_case_name_override: str | None = None
    # depending on the check, this may either be a file name or a path
    location: str | None = None
    line_number: int | None = None

    def location_as_file_name(self) -> str | None:
        if self.location is None:
            return None

        if "/" in self.location:
            return self.location[self.location.rindex("/") + 1 :]

        return self.location


class FailedTestExecutionError(UIError):
    """
    An UIError that is caused by a failing test.
    """

    def __init__(
        self,
        errors: list[TestFailureDetails],
        error_summary: str = "At least one test failed",
    ):
        super().__init__(error_summary)
        self.errors = errors


def try_determine_errors_from_cmd_execution(
    e: CommandFailureCausedUIError,
) -> list[TestFailureDetails]:
    output = e.stderr or e.stdout

    if "running docker compose failed" in str(e):
        return [determine_error_from_docker_compose_failure(e, output)]

    if output is None:
        return []

    error_chunks = extract_error_chunks_from_output(output)

    collected_errors = []
    for chunk in error_chunks:
        match = re.search(r"([^.]+\.td):(\d+):\d+:", chunk)
        if match is not None:
            # for .td files like Postgres CDC, file_path will just contain the file name
            file_path = match.group(1)
            line_number = int(match.group(2))
        else:
            # for .py files like platform checks, file_path will be a path
            file_path = try_determine_error_location_from_cmd(e.cmd)
            if file_path is None or ":" not in file_path:
                line_number = None
            else:
                parts = file_path.split(":")
                file_path = parts[0]
                line_number = int(parts[1])

        message = (
            f"Executing {file_path if file_path is not None else 'command'} failed!"
        )

        failure_details = TestFailureDetails(
            message,
            details=chunk,
            location=file_path,
            line_number=line_number,
        )

        if failure_details in collected_errors:
            # do not add an identical error again
            pass
        else:
            collected_errors.append(failure_details)

    return collected_errors


def determine_error_from_docker_compose_failure(
    e: CommandFailureCausedUIError, output: str | None
) -> TestFailureDetails:
    command = to_sanitized_command_str(e.cmd)
    return TestFailureDetails(
        f"Docker compose failed: {command}",
        details=output,
        location=None,
        line_number=None,
    )


def try_determine_error_location_from_cmd(cmd: list[str]) -> str | None:
    root_path_as_string = f"{MZ_ROOT}/"
    for cmd_part in cmd:
        if type(cmd_part) == str and cmd_part.startswith("--source="):
            return cmd_part.removeprefix("--source=").replace(root_path_as_string, "")

    return None


def extract_error_chunks_from_output(output: str) -> list[str]:
    if "+++ !!! Error Report" not in output:
        return []

    error_output = output[: output.index("+++ !!! Error Report") - 1]
    error_chunks = error_output.split("^^^ +++")

    return [chunk.strip() for chunk in error_chunks if len(chunk.strip()) > 0]


def to_sanitized_command_str(cmd: list[str]) -> str:
    command_str = " ".join([str(x) for x in cmd])
    return re.sub(PEM_CONTENT_RE, PEM_CONTENT_REPLACEMENT, command_str)

Functions

def determine_error_from_docker_compose_failure(e: CommandFailureCausedUIError, output: str | None) ‑> TestFailureDetails
Expand source code Browse git
def determine_error_from_docker_compose_failure(
    e: CommandFailureCausedUIError, output: str | None
) -> TestFailureDetails:
    command = to_sanitized_command_str(e.cmd)
    return TestFailureDetails(
        f"Docker compose failed: {command}",
        details=output,
        location=None,
        line_number=None,
    )
def extract_error_chunks_from_output(output: str) ‑> list[str]
Expand source code Browse git
def extract_error_chunks_from_output(output: str) -> list[str]:
    if "+++ !!! Error Report" not in output:
        return []

    error_output = output[: output.index("+++ !!! Error Report") - 1]
    error_chunks = error_output.split("^^^ +++")

    return [chunk.strip() for chunk in error_chunks if len(chunk.strip()) > 0]
def to_sanitized_command_str(cmd: list[str]) ‑> str
Expand source code Browse git
def to_sanitized_command_str(cmd: list[str]) -> str:
    command_str = " ".join([str(x) for x in cmd])
    return re.sub(PEM_CONTENT_RE, PEM_CONTENT_REPLACEMENT, command_str)
def try_determine_error_location_from_cmd(cmd: list[str]) ‑> str | None
Expand source code Browse git
def try_determine_error_location_from_cmd(cmd: list[str]) -> str | None:
    root_path_as_string = f"{MZ_ROOT}/"
    for cmd_part in cmd:
        if type(cmd_part) == str and cmd_part.startswith("--source="):
            return cmd_part.removeprefix("--source=").replace(root_path_as_string, "")

    return None
def try_determine_errors_from_cmd_execution(e: CommandFailureCausedUIError) ‑> list[TestFailureDetails]
Expand source code Browse git
def try_determine_errors_from_cmd_execution(
    e: CommandFailureCausedUIError,
) -> list[TestFailureDetails]:
    output = e.stderr or e.stdout

    if "running docker compose failed" in str(e):
        return [determine_error_from_docker_compose_failure(e, output)]

    if output is None:
        return []

    error_chunks = extract_error_chunks_from_output(output)

    collected_errors = []
    for chunk in error_chunks:
        match = re.search(r"([^.]+\.td):(\d+):\d+:", chunk)
        if match is not None:
            # for .td files like Postgres CDC, file_path will just contain the file name
            file_path = match.group(1)
            line_number = int(match.group(2))
        else:
            # for .py files like platform checks, file_path will be a path
            file_path = try_determine_error_location_from_cmd(e.cmd)
            if file_path is None or ":" not in file_path:
                line_number = None
            else:
                parts = file_path.split(":")
                file_path = parts[0]
                line_number = int(parts[1])

        message = (
            f"Executing {file_path if file_path is not None else 'command'} failed!"
        )

        failure_details = TestFailureDetails(
            message,
            details=chunk,
            location=file_path,
            line_number=line_number,
        )

        if failure_details in collected_errors:
            # do not add an identical error again
            pass
        else:
            collected_errors.append(failure_details)

    return collected_errors

Classes

class FailedTestExecutionError (errors: list[TestFailureDetails], error_summary: str = 'At least one test failed')

An UIError that is caused by a failing test.

Expand source code Browse git
class FailedTestExecutionError(UIError):
    """
    An UIError that is caused by a failing test.
    """

    def __init__(
        self,
        errors: list[TestFailureDetails],
        error_summary: str = "At least one test failed",
    ):
        super().__init__(error_summary)
        self.errors = errors

Ancestors

  • UIError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class TestFailureDetails (message: str, details: str | None, test_class_name_override: str | None = None, test_case_name_override: str | None = None, location: str | None = None, line_number: int | None = None)

TestFailureDetails(message: 'str', details: 'str | None', test_class_name_override: 'str | None' = None, test_case_name_override: 'str | None' = None, location: 'str | None' = None, line_number: 'int | None' = None)

Expand source code Browse git
@dataclass
class TestFailureDetails:
    __test__ = False

    message: str
    details: str | None
    test_class_name_override: str | None = None
    test_case_name_override: str | None = None
    # depending on the check, this may either be a file name or a path
    location: str | None = None
    line_number: int | None = None

    def location_as_file_name(self) -> str | None:
        if self.location is None:
            return None

        if "/" in self.location:
            return self.location[self.location.rindex("/") + 1 :]

        return self.location

Class variables

var details : str | None
var line_number : int | None
var location : str | None
var message : str
var test_case_name_override : str | None
var test_class_name_override : str | None

Methods

def location_as_file_name(self) ‑> str | None
Expand source code Browse git
def location_as_file_name(self) -> str | None:
    if self.location is None:
        return None

    if "/" in self.location:
        return self.location[self.location.rindex("/") + 1 :]

    return self.location
class TestResult (duration: float, errors: list[TestFailureDetails])

TestResult(duration: 'float', errors: 'list[TestFailureDetails]')

Expand source code Browse git
@dataclass
class TestResult:
    duration: float
    errors: list[TestFailureDetails]

    def is_successful(self) -> bool:
        return len(self.errors) == 0

Class variables

var duration : float
var errors : list[TestFailureDetails]

Methods

def is_successful(self) ‑> bool
Expand source code Browse git
def is_successful(self) -> bool:
    return len(self.errors) == 0