Module materialize.zippy.framework

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, TypeVar, Union

from materialize.mzcompose.composition import Composition

if TYPE_CHECKING:
    from materialize.zippy.scenarios import Scenario


class Capability:
    """Base class for a Zippy capability.

    A capability represents a condition that is true about a Zippy test context,
    like "a table with name 'foo' exists".
    """

    name: str

    @classmethod
    def format_str(cls) -> str:
        raise NotImplementedError()


T = TypeVar("T", bound=Capability)
ActionOrFactory = Union[type["Action"], "ActionFactory"]


class Capabilities:
    """A set of `Capability`s."""

    _capabilities: Sequence[Capability]

    def __init__(self) -> None:
        self._capabilities = []

    def _extend(self, capabilities: Sequence[Capability]) -> None:
        """Add new capabilities."""
        new_capabilities = list(capabilities)
        self._capabilities = list(self._capabilities) + new_capabilities

    def remove_capability_instance(self, capability: Capability) -> None:
        """Remove a specific capability."""

        self._capabilities = [
            cap for cap in self._capabilities if not cap == capability
        ]

    def _remove(self, capabilities: set[type[T]]) -> None:
        """Remove all existing capabilities of the specified types."""

        self._capabilities = [
            cap for cap in self._capabilities if type(cap) not in capabilities
        ]

    def provides(self, capability: type[T]) -> bool:
        """Report whether any capability of the specified type exists."""
        return len(self.get(capability)) > 0

    def get(self, capability: type[T]) -> list[T]:
        """Get all capabilities of the specified type."""
        matches: list[T] = [
            # NOTE: unfortunately pyright can't handle this
            cap
            for cap in self._capabilities
            if type(cap) == capability  # type: ignore
        ]
        return matches

    def get_capability_names(self, capability: type[T]) -> list[str]:
        return [t.name for t in self.get(capability)]

    def get_free_capability_name(
        self, capability: type[T], max_objects: int
    ) -> str | None:
        all_object_names = [
            capability.format_str().format(i) for i in range(0, max_objects)
        ]
        existing_object_names = self.get_capability_names(capability)
        remaining_object_names = set(all_object_names) - set(existing_object_names)
        return (
            random.choice(list(remaining_object_names))
            if len(remaining_object_names) > 0
            else None
        )


class Action:
    """Base class for an action that a Zippy test can take."""

    current_seqno: int = 0

    def __init__(self, capabilities: Capabilities) -> None:
        """Construct a new action, possibly conditioning on the available
        capabilities."""
        Action.current_seqno = Action.current_seqno + 1
        self.seqno = Action.current_seqno
        pass

    @classmethod
    def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
        """Compute the capability classes that this action requires."""
        return set()

    @classmethod
    def incompatible_with(cls) -> set[type[Capability]]:
        """The capability classes that this action is not compatible with."""
        return set()

    def withholds(self) -> set[type[Capability]]:
        """Compute the capability classes that this action will make unavailable."""
        return set()

    def provides(self) -> list[Capability]:
        """Compute the capabilities that this action will make available."""
        return []

    def run(self, c: Composition) -> None:
        """Run this action on the provided copmosition."""
        assert False

    @classmethod
    def require_explicit_mention(cls) -> bool:
        """Only use if explicitly mentioned by name in a Scenario."""
        return False

    def __str__(self) -> str:
        return f"--- #{self.seqno}: {self.__class__.__name__}"


class ActionFactory:
    """Base class for Action Factories that return parameterized Actions to execute."""

    def new(self, capabilities: Capabilities) -> list[Action]:
        assert False

    @classmethod
    def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
        """Compute the capability classes that this Action Factory requires."""
        return set()

    @classmethod
    def incompatible_with(cls) -> set[type[Capability]]:
        """The capability classes that this action is not compatible with."""
        return set()


class Test:
    """A Zippy test, consisting of a sequence of actions."""

    def __init__(
        self, scenario: "Scenario", actions: int, max_execution_time: timedelta
    ) -> None:
        """Generate a new Zippy test.

        Args:
            scenario: The Scenario to pick actions from.
            actions: The number of actions to generate.
        """
        self._scenario = scenario
        self._actions: list[Action] = []
        self._final_actions: list[Action] = []
        self._capabilities = Capabilities()
        self._actions_with_weight: dict[
            ActionOrFactory, float
        ] = self._scenario.actions_with_weight()
        self._max_execution_time: timedelta = max_execution_time

        for action_or_factory in self._scenario.bootstrap():
            self._actions.extend(self.generate_actions(action_or_factory))

        while len(self._actions) < actions:
            action_or_factory = self._pick_action_or_factory()
            self._actions.extend(self.generate_actions(action_or_factory))

        for action_or_factory in self._scenario.finalization():
            self._final_actions.extend(self.generate_actions(action_or_factory))

    def generate_actions(self, action_def: ActionOrFactory) -> list[Action]:
        if isinstance(action_def, ActionFactory):
            actions = action_def.new(capabilities=self._capabilities)
        elif issubclass(action_def, Action):
            actions = [action_def(capabilities=self._capabilities)]
        else:
            assert False

        for action in actions:
            print("test:", action)
            self._capabilities._extend(action.provides())
            print(" - ", self._capabilities, action.provides())
            self._capabilities._remove(action.withholds())
            print(" - ", self._capabilities, action.withholds())

        return actions

    def run(self, c: Composition) -> None:
        """Run the Zippy test."""
        max_time = datetime.now() + self._max_execution_time
        for action in self._actions:
            print(action)
            action.run(c)
            if datetime.now() > max_time:
                print(
                    f"--- Desired execution time of {self._max_execution_time} has been reached."
                )
                break

        for action in self._final_actions:
            print(action)
            action.run(c)

    def _pick_action_or_factory(self) -> ActionOrFactory:
        """Select the next Action to run in the Test"""
        actions_or_factories: list[ActionOrFactory] = []
        class_weights = []

        for action_or_factory in self._actions_with_weight.keys():
            alternatives = []

            # We do not drill down if it is an ActionFactory
            # If it is an Action, drill down for any children
            subclasses: list[ActionOrFactory] = (
                [action_or_factory]
                if isinstance(action_or_factory, ActionFactory)
                else self._all_subclasses(action_or_factory)
            )

            for subclass in subclasses:
                # Do not pick an Action whose requirements can not be satisfied
                if self._can_run(subclass):
                    alternatives.append(subclass)

            for alternative in alternatives:
                actions_or_factories.append(alternative)
                weight = self._actions_with_weight[action_or_factory]
                class_weights.append(weight / len(alternatives))

        assert (
            len(actions_or_factories) > 0
        ), "No actions available to take. You may be stopping or deleting items without starting them again."

        return random.choices(actions_or_factories, weights=class_weights, k=1)[0]

    def _can_run(self, action: ActionOrFactory) -> bool:
        if any(
            self._capabilities.provides(dislike)
            for dislike in action.incompatible_with()
        ):
            return False

        requires = action.requires()
        if isinstance(requires, set):
            return all(self._capabilities.provides(req) for req in requires)
        else:
            for one_alternative in requires:
                if all(self._capabilities.provides(req) for req in one_alternative):
                    return True
            return False

    def _all_subclasses(self, cls: type[Action]) -> list[ActionOrFactory]:
        """Return all Actions that are a subclass of the given cls."""
        children = [c for c in cls.__subclasses__() if not c.require_explicit_mention()]
        if len(children) == 0:
            return [cls]
        else:
            subclasses = []
            for c in children:
                subclasses.extend(self._all_subclasses(c))
            return subclasses

Classes

class Action (capabilities: Capabilities)

Base class for an action that a Zippy test can take.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class Action:
    """Base class for an action that a Zippy test can take."""

    current_seqno: int = 0

    def __init__(self, capabilities: Capabilities) -> None:
        """Construct a new action, possibly conditioning on the available
        capabilities."""
        Action.current_seqno = Action.current_seqno + 1
        self.seqno = Action.current_seqno
        pass

    @classmethod
    def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
        """Compute the capability classes that this action requires."""
        return set()

    @classmethod
    def incompatible_with(cls) -> set[type[Capability]]:
        """The capability classes that this action is not compatible with."""
        return set()

    def withholds(self) -> set[type[Capability]]:
        """Compute the capability classes that this action will make unavailable."""
        return set()

    def provides(self) -> list[Capability]:
        """Compute the capabilities that this action will make available."""
        return []

    def run(self, c: Composition) -> None:
        """Run this action on the provided copmosition."""
        assert False

    @classmethod
    def require_explicit_mention(cls) -> bool:
        """Only use if explicitly mentioned by name in a Scenario."""
        return False

    def __str__(self) -> str:
        return f"--- #{self.seqno}: {self.__class__.__name__}"

Subclasses

Class variables

var current_seqno : int

Static methods

def incompatible_with() ‑> set[type[Capability]]

The capability classes that this action is not compatible with.

Expand source code Browse git
@classmethod
def incompatible_with(cls) -> set[type[Capability]]:
    """The capability classes that this action is not compatible with."""
    return set()
def require_explicit_mention() ‑> bool

Only use if explicitly mentioned by name in a Scenario.

Expand source code Browse git
@classmethod
def require_explicit_mention(cls) -> bool:
    """Only use if explicitly mentioned by name in a Scenario."""
    return False
def requires() ‑> set[type[Capability]] | list[set[type[Capability]]]

Compute the capability classes that this action requires.

Expand source code Browse git
@classmethod
def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
    """Compute the capability classes that this action requires."""
    return set()

Methods

def provides(self) ‑> list[Capability]

Compute the capabilities that this action will make available.

Expand source code Browse git
def provides(self) -> list[Capability]:
    """Compute the capabilities that this action will make available."""
    return []
def run(self, c: Composition) ‑> None

Run this action on the provided copmosition.

Expand source code Browse git
def run(self, c: Composition) -> None:
    """Run this action on the provided copmosition."""
    assert False
def withholds(self) ‑> set[type[Capability]]

Compute the capability classes that this action will make unavailable.

Expand source code Browse git
def withholds(self) -> set[type[Capability]]:
    """Compute the capability classes that this action will make unavailable."""
    return set()
class ActionFactory

Base class for Action Factories that return parameterized Actions to execute.

Expand source code Browse git
class ActionFactory:
    """Base class for Action Factories that return parameterized Actions to execute."""

    def new(self, capabilities: Capabilities) -> list[Action]:
        assert False

    @classmethod
    def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
        """Compute the capability classes that this Action Factory requires."""
        return set()

    @classmethod
    def incompatible_with(cls) -> set[type[Capability]]:
        """The capability classes that this action is not compatible with."""
        return set()

Subclasses

Static methods

def incompatible_with() ‑> set[type[Capability]]

The capability classes that this action is not compatible with.

Expand source code Browse git
@classmethod
def incompatible_with(cls) -> set[type[Capability]]:
    """The capability classes that this action is not compatible with."""
    return set()
def requires() ‑> set[type[Capability]] | list[set[type[Capability]]]

Compute the capability classes that this Action Factory requires.

Expand source code Browse git
@classmethod
def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
    """Compute the capability classes that this Action Factory requires."""
    return set()

Methods

def new(self, capabilities: Capabilities) ‑> list[Action]
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]:
    assert False
class Capabilities

A set of Capabilitys.

Expand source code Browse git
class Capabilities:
    """A set of `Capability`s."""

    _capabilities: Sequence[Capability]

    def __init__(self) -> None:
        self._capabilities = []

    def _extend(self, capabilities: Sequence[Capability]) -> None:
        """Add new capabilities."""
        new_capabilities = list(capabilities)
        self._capabilities = list(self._capabilities) + new_capabilities

    def remove_capability_instance(self, capability: Capability) -> None:
        """Remove a specific capability."""

        self._capabilities = [
            cap for cap in self._capabilities if not cap == capability
        ]

    def _remove(self, capabilities: set[type[T]]) -> None:
        """Remove all existing capabilities of the specified types."""

        self._capabilities = [
            cap for cap in self._capabilities if type(cap) not in capabilities
        ]

    def provides(self, capability: type[T]) -> bool:
        """Report whether any capability of the specified type exists."""
        return len(self.get(capability)) > 0

    def get(self, capability: type[T]) -> list[T]:
        """Get all capabilities of the specified type."""
        matches: list[T] = [
            # NOTE: unfortunately pyright can't handle this
            cap
            for cap in self._capabilities
            if type(cap) == capability  # type: ignore
        ]
        return matches

    def get_capability_names(self, capability: type[T]) -> list[str]:
        return [t.name for t in self.get(capability)]

    def get_free_capability_name(
        self, capability: type[T], max_objects: int
    ) -> str | None:
        all_object_names = [
            capability.format_str().format(i) for i in range(0, max_objects)
        ]
        existing_object_names = self.get_capability_names(capability)
        remaining_object_names = set(all_object_names) - set(existing_object_names)
        return (
            random.choice(list(remaining_object_names))
            if len(remaining_object_names) > 0
            else None
        )

Methods

def get(self, capability: type[~T]) ‑> list[~T]

Get all capabilities of the specified type.

Expand source code Browse git
def get(self, capability: type[T]) -> list[T]:
    """Get all capabilities of the specified type."""
    matches: list[T] = [
        # NOTE: unfortunately pyright can't handle this
        cap
        for cap in self._capabilities
        if type(cap) == capability  # type: ignore
    ]
    return matches
def get_capability_names(self, capability: type[~T]) ‑> list[str]
Expand source code Browse git
def get_capability_names(self, capability: type[T]) -> list[str]:
    return [t.name for t in self.get(capability)]
def get_free_capability_name(self, capability: type[~T], max_objects: int) ‑> str | None
Expand source code Browse git
def get_free_capability_name(
    self, capability: type[T], max_objects: int
) -> str | None:
    all_object_names = [
        capability.format_str().format(i) for i in range(0, max_objects)
    ]
    existing_object_names = self.get_capability_names(capability)
    remaining_object_names = set(all_object_names) - set(existing_object_names)
    return (
        random.choice(list(remaining_object_names))
        if len(remaining_object_names) > 0
        else None
    )
def provides(self, capability: type[~T]) ‑> bool

Report whether any capability of the specified type exists.

Expand source code Browse git
def provides(self, capability: type[T]) -> bool:
    """Report whether any capability of the specified type exists."""
    return len(self.get(capability)) > 0
def remove_capability_instance(self, capability: Capability) ‑> None

Remove a specific capability.

Expand source code Browse git
def remove_capability_instance(self, capability: Capability) -> None:
    """Remove a specific capability."""

    self._capabilities = [
        cap for cap in self._capabilities if not cap == capability
    ]
class Capability

Base class for a Zippy capability.

A capability represents a condition that is true about a Zippy test context, like "a table with name 'foo' exists".

Expand source code Browse git
class Capability:
    """Base class for a Zippy capability.

    A capability represents a condition that is true about a Zippy test context,
    like "a table with name 'foo' exists".
    """

    name: str

    @classmethod
    def format_str(cls) -> str:
        raise NotImplementedError()

Subclasses

Class variables

var name : str

Static methods

def format_str() ‑> str
Expand source code Browse git
@classmethod
def format_str(cls) -> str:
    raise NotImplementedError()
class Test (scenario: Scenario, actions: int, max_execution_time: datetime.timedelta)

A Zippy test, consisting of a sequence of actions.

Generate a new Zippy test.

Args

scenario
The Scenario to pick actions from.
actions
The number of actions to generate.
Expand source code Browse git
class Test:
    """A Zippy test, consisting of a sequence of actions."""

    def __init__(
        self, scenario: "Scenario", actions: int, max_execution_time: timedelta
    ) -> None:
        """Generate a new Zippy test.

        Args:
            scenario: The Scenario to pick actions from.
            actions: The number of actions to generate.
        """
        self._scenario = scenario
        self._actions: list[Action] = []
        self._final_actions: list[Action] = []
        self._capabilities = Capabilities()
        self._actions_with_weight: dict[
            ActionOrFactory, float
        ] = self._scenario.actions_with_weight()
        self._max_execution_time: timedelta = max_execution_time

        for action_or_factory in self._scenario.bootstrap():
            self._actions.extend(self.generate_actions(action_or_factory))

        while len(self._actions) < actions:
            action_or_factory = self._pick_action_or_factory()
            self._actions.extend(self.generate_actions(action_or_factory))

        for action_or_factory in self._scenario.finalization():
            self._final_actions.extend(self.generate_actions(action_or_factory))

    def generate_actions(self, action_def: ActionOrFactory) -> list[Action]:
        if isinstance(action_def, ActionFactory):
            actions = action_def.new(capabilities=self._capabilities)
        elif issubclass(action_def, Action):
            actions = [action_def(capabilities=self._capabilities)]
        else:
            assert False

        for action in actions:
            print("test:", action)
            self._capabilities._extend(action.provides())
            print(" - ", self._capabilities, action.provides())
            self._capabilities._remove(action.withholds())
            print(" - ", self._capabilities, action.withholds())

        return actions

    def run(self, c: Composition) -> None:
        """Run the Zippy test."""
        max_time = datetime.now() + self._max_execution_time
        for action in self._actions:
            print(action)
            action.run(c)
            if datetime.now() > max_time:
                print(
                    f"--- Desired execution time of {self._max_execution_time} has been reached."
                )
                break

        for action in self._final_actions:
            print(action)
            action.run(c)

    def _pick_action_or_factory(self) -> ActionOrFactory:
        """Select the next Action to run in the Test"""
        actions_or_factories: list[ActionOrFactory] = []
        class_weights = []

        for action_or_factory in self._actions_with_weight.keys():
            alternatives = []

            # We do not drill down if it is an ActionFactory
            # If it is an Action, drill down for any children
            subclasses: list[ActionOrFactory] = (
                [action_or_factory]
                if isinstance(action_or_factory, ActionFactory)
                else self._all_subclasses(action_or_factory)
            )

            for subclass in subclasses:
                # Do not pick an Action whose requirements can not be satisfied
                if self._can_run(subclass):
                    alternatives.append(subclass)

            for alternative in alternatives:
                actions_or_factories.append(alternative)
                weight = self._actions_with_weight[action_or_factory]
                class_weights.append(weight / len(alternatives))

        assert (
            len(actions_or_factories) > 0
        ), "No actions available to take. You may be stopping or deleting items without starting them again."

        return random.choices(actions_or_factories, weights=class_weights, k=1)[0]

    def _can_run(self, action: ActionOrFactory) -> bool:
        if any(
            self._capabilities.provides(dislike)
            for dislike in action.incompatible_with()
        ):
            return False

        requires = action.requires()
        if isinstance(requires, set):
            return all(self._capabilities.provides(req) for req in requires)
        else:
            for one_alternative in requires:
                if all(self._capabilities.provides(req) for req in one_alternative):
                    return True
            return False

    def _all_subclasses(self, cls: type[Action]) -> list[ActionOrFactory]:
        """Return all Actions that are a subclass of the given cls."""
        children = [c for c in cls.__subclasses__() if not c.require_explicit_mention()]
        if len(children) == 0:
            return [cls]
        else:
            subclasses = []
            for c in children:
                subclasses.extend(self._all_subclasses(c))
            return subclasses

Methods

def generate_actions(self, action_def: Union[type['Action'], ForwardRef('ActionFactory')]) ‑> list[Action]
Expand source code Browse git
def generate_actions(self, action_def: ActionOrFactory) -> list[Action]:
    if isinstance(action_def, ActionFactory):
        actions = action_def.new(capabilities=self._capabilities)
    elif issubclass(action_def, Action):
        actions = [action_def(capabilities=self._capabilities)]
    else:
        assert False

    for action in actions:
        print("test:", action)
        self._capabilities._extend(action.provides())
        print(" - ", self._capabilities, action.provides())
        self._capabilities._remove(action.withholds())
        print(" - ", self._capabilities, action.withholds())

    return actions
def run(self, c: Composition) ‑> None

Run the Zippy test.

Expand source code Browse git
def run(self, c: Composition) -> None:
    """Run the Zippy test."""
    max_time = datetime.now() + self._max_execution_time
    for action in self._actions:
        print(action)
        action.run(c)
        if datetime.now() > max_time:
            print(
                f"--- Desired execution time of {self._max_execution_time} has been reached."
            )
            break

    for action in self._final_actions:
        print(action)
        action.run(c)