Module materialize.checks.executors

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
import threading
from inspect import Traceback
from typing import Any

from materialize.cloudtest.app.materialize_application import MaterializeApplication
from materialize.mz_version import MzVersion
from materialize.mzcompose.composition import Composition


class Executor:
    # Store the current Materialize version and keep it up-to-date during
    # upgrades so that actions can depend not just on the base version, but
    # also on the current version. This enables testing more interesting
    # scenarios which are still in development and not available a few versions
    # back already.
    current_mz_version: MzVersion
    # All the system settings we have already set in previous Mz versions. No
    # need to set them again in a future version since they should be
    # persisted.
    system_settings: set[str] = set()

    def testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> None:
        assert False

    def mzcompose_composition(self) -> Composition:
        assert False

    def cloudtest_application(self) -> MaterializeApplication:
        assert False

    def join(self, handle: Any) -> None:
        pass


class MzcomposeExecutor(Executor):
    def __init__(self, composition: Composition) -> None:
        self.composition = composition

    def mzcompose_composition(self) -> Composition:
        return self.composition

    def testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> None:
        self.composition.testdrive(input, caller=caller, mz_service=mz_service)


class MzcomposeExecutorParallel(MzcomposeExecutor):
    def __init__(self, composition: Composition) -> None:
        self.composition = composition
        self.exception: BaseException | None = None

    def testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> Any:
        thread = threading.Thread(target=self._testdrive, args=[input, caller])
        thread.start()
        return thread

    def _testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> None:
        try:
            self.composition.testdrive(input, caller=caller, mz_service=mz_service)
        except BaseException as e:
            self.exception = e

    def join(self, handle: Any) -> None:
        assert type(handle) is threading.Thread
        handle.join()
        if self.exception:
            raise self.exception


class CloudtestExecutor(Executor):
    def __init__(self, application: MaterializeApplication, version: MzVersion) -> None:
        self.application = application
        self.seed = random.getrandbits(32)
        self.current_mz_version = version

    def cloudtest_application(self) -> MaterializeApplication:
        return self.application

    def testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> None:
        assert (
            mz_service is None
        ), "CloudtestExecutor not yet compatible with custom mz_service names"
        self.application.testdrive.run(
            input=input, no_reset=True, seed=self.seed, caller=caller
        )

Classes

class CloudtestExecutor (application: MaterializeApplication, version: MzVersion)
Expand source code Browse git
class CloudtestExecutor(Executor):
    def __init__(self, application: MaterializeApplication, version: MzVersion) -> None:
        self.application = application
        self.seed = random.getrandbits(32)
        self.current_mz_version = version

    def cloudtest_application(self) -> MaterializeApplication:
        return self.application

    def testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> None:
        assert (
            mz_service is None
        ), "CloudtestExecutor not yet compatible with custom mz_service names"
        self.application.testdrive.run(
            input=input, no_reset=True, seed=self.seed, caller=caller
        )

Ancestors

Methods

def cloudtest_application(self) ‑> MaterializeApplication
Expand source code Browse git
def cloudtest_application(self) -> MaterializeApplication:
    return self.application
def testdrive(self, input: str, caller: inspect.Traceback | None = None, mz_service: str | None = None) ‑> None
Expand source code Browse git
def testdrive(
    self, input: str, caller: Traceback | None = None, mz_service: str | None = None
) -> None:
    assert (
        mz_service is None
    ), "CloudtestExecutor not yet compatible with custom mz_service names"
    self.application.testdrive.run(
        input=input, no_reset=True, seed=self.seed, caller=caller
    )
class Executor
Expand source code Browse git
class Executor:
    # Store the current Materialize version and keep it up-to-date during
    # upgrades so that actions can depend not just on the base version, but
    # also on the current version. This enables testing more interesting
    # scenarios which are still in development and not available a few versions
    # back already.
    current_mz_version: MzVersion
    # All the system settings we have already set in previous Mz versions. No
    # need to set them again in a future version since they should be
    # persisted.
    system_settings: set[str] = set()

    def testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> None:
        assert False

    def mzcompose_composition(self) -> Composition:
        assert False

    def cloudtest_application(self) -> MaterializeApplication:
        assert False

    def join(self, handle: Any) -> None:
        pass

Subclasses

Class variables

var current_mz_versionMzVersion
var system_settings : set[str]

Methods

def cloudtest_application(self) ‑> MaterializeApplication
Expand source code Browse git
def cloudtest_application(self) -> MaterializeApplication:
    assert False
def join(self, handle: Any) ‑> None
Expand source code Browse git
def join(self, handle: Any) -> None:
    pass
def mzcompose_composition(self) ‑> Composition
Expand source code Browse git
def mzcompose_composition(self) -> Composition:
    assert False
def testdrive(self, input: str, caller: inspect.Traceback | None = None, mz_service: str | None = None) ‑> None
Expand source code Browse git
def testdrive(
    self, input: str, caller: Traceback | None = None, mz_service: str | None = None
) -> None:
    assert False
class MzcomposeExecutor (composition: Composition)
Expand source code Browse git
class MzcomposeExecutor(Executor):
    def __init__(self, composition: Composition) -> None:
        self.composition = composition

    def mzcompose_composition(self) -> Composition:
        return self.composition

    def testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> None:
        self.composition.testdrive(input, caller=caller, mz_service=mz_service)

Ancestors

Subclasses

Methods

def mzcompose_composition(self) ‑> Composition
Expand source code Browse git
def mzcompose_composition(self) -> Composition:
    return self.composition
def testdrive(self, input: str, caller: inspect.Traceback | None = None, mz_service: str | None = None) ‑> None
Expand source code Browse git
def testdrive(
    self, input: str, caller: Traceback | None = None, mz_service: str | None = None
) -> None:
    self.composition.testdrive(input, caller=caller, mz_service=mz_service)
class MzcomposeExecutorParallel (composition: Composition)
Expand source code Browse git
class MzcomposeExecutorParallel(MzcomposeExecutor):
    def __init__(self, composition: Composition) -> None:
        self.composition = composition
        self.exception: BaseException | None = None

    def testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> Any:
        thread = threading.Thread(target=self._testdrive, args=[input, caller])
        thread.start()
        return thread

    def _testdrive(
        self, input: str, caller: Traceback | None = None, mz_service: str | None = None
    ) -> None:
        try:
            self.composition.testdrive(input, caller=caller, mz_service=mz_service)
        except BaseException as e:
            self.exception = e

    def join(self, handle: Any) -> None:
        assert type(handle) is threading.Thread
        handle.join()
        if self.exception:
            raise self.exception

Ancestors

Methods

def join(self, handle: Any) ‑> None
Expand source code Browse git
def join(self, handle: Any) -> None:
    assert type(handle) is threading.Thread
    handle.join()
    if self.exception:
        raise self.exception
def testdrive(self, input: str, caller: inspect.Traceback | None = None, mz_service: str | None = None) ‑> Any
Expand source code Browse git
def testdrive(
    self, input: str, caller: Traceback | None = None, mz_service: str | None = None
) -> Any:
    thread = threading.Thread(target=self._testdrive, args=[input, caller])
    thread.start()
    return thread