Module materialize.cloudtest.util.common

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import logging
import subprocess
from collections.abc import Callable
from textwrap import dedent
from time import sleep
from typing import Any, cast

LOGGER = logging.getLogger(__name__)


def retry(
    f: Callable[[], Any],
    max_attempts: int,
    exception_types: list[type[Exception]],
    sleep_secs: int = 1,
    message: str | None = None,
) -> Any:
    result: Any = None
    for attempt in range(1, max_attempts + 1):
        try:
            result = f()
            break
        except tuple(exception_types) as e:
            if attempt == max_attempts:
                if message:
                    LOGGER.info(message)
                else:
                    LOGGER.error(f"Exception in attempt {attempt}: ", exc_info=e)
                raise
            sleep(sleep_secs)
    return result


def is_subdict(
    larger_dict: dict[str, Any],
    smaller_dict: dict[str, Any],
    key_path: str = "",
) -> bool:
    def is_sublist(
        larger_list: list[Any], smaller_list: list[Any], key_path: str = ""
    ) -> bool:
        # All members of list must exist in larger_dict's list,
        # but if they are dicts, they are allowed to be subdicts,
        # rather than exact matches.
        if len(larger_list) < len(smaller_list):
            LOGGER.warning(f"{key_path}: smaller_list is larger than larger_list")
            return False
        for i, value in enumerate(smaller_list):
            current_key = f"{key_path}.{i}"
            if isinstance(value, dict):
                if not is_subdict(
                    larger_list[i],
                    cast(dict[str, Any], value),
                    current_key,
                ):
                    return False
            elif isinstance(value, list):
                if not is_sublist(
                    larger_list[i],
                    value,
                    current_key,
                ):
                    return False
            else:
                if value != larger_list[i]:
                    LOGGER.warning(
                        f"{key_path}.{i}: scalar value does not match: {value} != {larger_list[i]}",
                    )
                    return False
        return True

    for key, value in smaller_dict.items():
        current_key = f"{key_path}.{key}"
        if key not in larger_dict:
            LOGGER.warning(f"{key_path}.{key}: key not found in larger_dict")
            return False
        if isinstance(value, dict):
            if not is_subdict(
                larger_dict[key],
                cast(dict[str, Any], value),
                current_key,
            ):
                return False
        elif isinstance(value, list):
            if not is_sublist(
                larger_dict[key],
                value,
                current_key,
            ):
                return False
        else:
            if value != larger_dict[key]:
                LOGGER.warning(
                    f"{current_key}: scalar value does not match: {value} != {larger_dict[key]}",
                )
                return False
    return True


def run_process_with_error_information(
    cmd: list[str], input: str | None = None, capture_output: bool = False
) -> None:
    try:
        subprocess.run(
            cmd, text=True, input=input, check=True, capture_output=capture_output
        )
    except subprocess.CalledProcessError as e:
        log_subprocess_error(e)
        raise e


def log_subprocess_error(e: subprocess.CalledProcessError) -> None:
    LOGGER.error(
        dedent(
            f"""
                cmd: {e.cmd}
                returncode: {e.returncode}
                stdout: {e.stdout}
                stderr: {e.stderr}
                """
        )
    )

Functions

def is_subdict(larger_dict: dict[str, typing.Any], smaller_dict: dict[str, typing.Any], key_path: str = '') ‑> bool
Expand source code Browse git
def is_subdict(
    larger_dict: dict[str, Any],
    smaller_dict: dict[str, Any],
    key_path: str = "",
) -> bool:
    def is_sublist(
        larger_list: list[Any], smaller_list: list[Any], key_path: str = ""
    ) -> bool:
        # All members of list must exist in larger_dict's list,
        # but if they are dicts, they are allowed to be subdicts,
        # rather than exact matches.
        if len(larger_list) < len(smaller_list):
            LOGGER.warning(f"{key_path}: smaller_list is larger than larger_list")
            return False
        for i, value in enumerate(smaller_list):
            current_key = f"{key_path}.{i}"
            if isinstance(value, dict):
                if not is_subdict(
                    larger_list[i],
                    cast(dict[str, Any], value),
                    current_key,
                ):
                    return False
            elif isinstance(value, list):
                if not is_sublist(
                    larger_list[i],
                    value,
                    current_key,
                ):
                    return False
            else:
                if value != larger_list[i]:
                    LOGGER.warning(
                        f"{key_path}.{i}: scalar value does not match: {value} != {larger_list[i]}",
                    )
                    return False
        return True

    for key, value in smaller_dict.items():
        current_key = f"{key_path}.{key}"
        if key not in larger_dict:
            LOGGER.warning(f"{key_path}.{key}: key not found in larger_dict")
            return False
        if isinstance(value, dict):
            if not is_subdict(
                larger_dict[key],
                cast(dict[str, Any], value),
                current_key,
            ):
                return False
        elif isinstance(value, list):
            if not is_sublist(
                larger_dict[key],
                value,
                current_key,
            ):
                return False
        else:
            if value != larger_dict[key]:
                LOGGER.warning(
                    f"{current_key}: scalar value does not match: {value} != {larger_dict[key]}",
                )
                return False
    return True
def log_subprocess_error(e: subprocess.CalledProcessError) ‑> None
Expand source code Browse git
def log_subprocess_error(e: subprocess.CalledProcessError) -> None:
    LOGGER.error(
        dedent(
            f"""
                cmd: {e.cmd}
                returncode: {e.returncode}
                stdout: {e.stdout}
                stderr: {e.stderr}
                """
        )
    )
def retry(f: collections.abc.Callable[[], typing.Any], max_attempts: int, exception_types: list[type[Exception]], sleep_secs: int = 1, message: str | None = None) ‑> Any
Expand source code Browse git
def retry(
    f: Callable[[], Any],
    max_attempts: int,
    exception_types: list[type[Exception]],
    sleep_secs: int = 1,
    message: str | None = None,
) -> Any:
    result: Any = None
    for attempt in range(1, max_attempts + 1):
        try:
            result = f()
            break
        except tuple(exception_types) as e:
            if attempt == max_attempts:
                if message:
                    LOGGER.info(message)
                else:
                    LOGGER.error(f"Exception in attempt {attempt}: ", exc_info=e)
                raise
            sleep(sleep_secs)
    return result
def run_process_with_error_information(cmd: list[str], input: str | None = None, capture_output: bool = False) ‑> None
Expand source code Browse git
def run_process_with_error_information(
    cmd: list[str], input: str | None = None, capture_output: bool = False
) -> None:
    try:
        subprocess.run(
            cmd, text=True, input=input, check=True, capture_output=capture_output
        )
    except subprocess.CalledProcessError as e:
        log_subprocess_error(e)
        raise e