Module materialize.checks.scenarios
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from random import Random
from materialize.checks.actions import Action, Initialize, Manipulate, Validate
from materialize.checks.checks import Check
from materialize.checks.cloudtest_actions import ReplaceEnvironmentdStatefulSet
from materialize.checks.executors import Executor
from materialize.checks.mzcompose_actions import (
ConfigureMz,
KillClusterdCompute,
KillMz,
StartClusterdCompute,
StartMz,
UseClusterdCompute,
)
from materialize.checks.mzcompose_actions import (
DropCreateDefaultReplica as DropCreateDefaultReplicaAction,
)
from materialize.checks.mzcompose_actions import (
KillClusterdStorage as KillClusterdStorageAction,
)
from materialize.checks.mzcompose_actions import (
RestartCockroach as RestartCockroachAction,
)
from materialize.checks.mzcompose_actions import (
RestartRedpandaDebezium as RestartRedpandaDebeziumAction,
)
from materialize.checks.mzcompose_actions import (
RestartSourcePostgres as RestartSourcePostgresAction,
)
from materialize.mz_version import MzVersion
class Scenario:
def __init__(
self, checks: list[type[Check]], executor: Executor, seed: str | None = None
) -> None:
self._checks = checks
self.executor = executor
self.rng = None if seed is None else Random(seed)
self._base_version = MzVersion.parse_cargo()
filtered_check_classes = []
for check_class in self.checks():
if self._include_check_class(check_class):
filtered_check_classes.append(check_class)
# Use base_version() here instead of _base_version so that overwriting
# upgrade scenarios can specify another base version.
self.check_objects = [
check_class(self.base_version(), self.rng)
for check_class in filtered_check_classes
]
def checks(self) -> list[type[Check]]:
if self.rng:
self.rng.shuffle(self._checks)
return self._checks
def actions(self) -> list[Action]:
assert False
def base_version(self) -> MzVersion:
return self._base_version
def run(self) -> None:
actions = self.actions()
# Configure implicitly for cloud scenarios
if not isinstance(actions[0], StartMz):
actions.insert(0, ConfigureMz(self))
for index, action in enumerate(actions):
# Implicitly call configure to raise version-dependent limits
if isinstance(action, StartMz) and not any(
env.startswith("MZ_DEPLOY_GENERATION=")
for env in action.environment_extra
):
actions.insert(
index + 1, ConfigureMz(self, mz_service=action.mz_service)
)
elif isinstance(action, ReplaceEnvironmentdStatefulSet):
actions.insert(index + 1, ConfigureMz(self))
for action in actions:
action.execute(self.executor)
action.join(self.executor)
def requires_external_idempotence(self) -> bool:
return False
def _include_check_class(self, check_class: type[Check]) -> bool:
return not check_class.__name__.endswith("Base") and (
not self.requires_external_idempotence()
or check_class.externally_idempotent
)
class NoRestartNoUpgrade(Scenario):
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
Manipulate(self, phase=1),
Manipulate(self, phase=2),
Validate(self),
]
class RestartEntireMz(Scenario):
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
KillMz(),
StartMz(self),
Manipulate(self, phase=1),
KillMz(),
StartMz(self),
Manipulate(self, phase=2),
KillMz(),
StartMz(self),
Validate(self),
]
class DropCreateDefaultReplica(Scenario):
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
Manipulate(self, phase=1),
DropCreateDefaultReplicaAction(self),
Manipulate(self, phase=2),
Validate(self),
]
class RestartClusterdCompute(Scenario):
"""Restart clusterd by having it run in a separate container that is then killed and restarted."""
def actions(self) -> list[Action]:
return [
StartMz(self),
StartClusterdCompute(),
UseClusterdCompute(self),
Initialize(self),
KillClusterdCompute(),
StartClusterdCompute(),
Manipulate(self, phase=1),
KillClusterdCompute(),
StartClusterdCompute(),
Manipulate(self, phase=2),
KillClusterdCompute(),
StartClusterdCompute(),
Validate(self),
]
class RestartEnvironmentdClusterdStorage(Scenario):
"""Restart environmentd and storage clusterds (as spawned from it), while keeping computed running by placing it in a separate container."""
def actions(self) -> list[Action]:
return [
StartMz(self),
StartClusterdCompute(),
UseClusterdCompute(self),
Initialize(self),
KillMz(),
StartMz(self),
Manipulate(self, phase=1),
KillMz(),
StartMz(self),
Manipulate(self, phase=2),
KillMz(),
StartMz(self),
Validate(self),
# Validate again so that introducing non-idempotent validate()s
# will cause the CI to fail.
Validate(self),
]
class KillClusterdStorage(Scenario):
"""Kill storage clusterd while it is running inside the enviromentd container. The process orchestrator will (try to) start it again."""
def actions(self) -> list[Action]:
return [
StartMz(self),
StartClusterdCompute(),
UseClusterdCompute(self),
Initialize(self),
KillClusterdStorageAction(),
Manipulate(self, phase=1),
KillClusterdStorageAction(),
Manipulate(self, phase=2),
KillClusterdStorageAction(),
Validate(self),
]
class RestartCockroach(Scenario):
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
RestartCockroachAction(),
Manipulate(self, phase=1),
RestartCockroachAction(),
Manipulate(self, phase=2),
RestartCockroachAction(),
Validate(self),
]
class RestartSourcePostgres(Scenario):
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
RestartSourcePostgresAction(),
Manipulate(self, phase=1),
RestartSourcePostgresAction(),
Manipulate(self, phase=2),
RestartSourcePostgresAction(),
Validate(self),
]
class RestartRedpandaDebezium(Scenario):
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
RestartRedpandaDebeziumAction(),
Manipulate(self, phase=1),
RestartRedpandaDebeziumAction(),
Manipulate(self, phase=2),
RestartRedpandaDebeziumAction(),
Validate(self),
]
Classes
class DropCreateDefaultReplica (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Expand source code Browse git
class DropCreateDefaultReplica(Scenario): def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), Manipulate(self, phase=1), DropCreateDefaultReplicaAction(self), Manipulate(self, phase=2), Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), Manipulate(self, phase=1), DropCreateDefaultReplicaAction(self), Manipulate(self, phase=2), Validate(self), ]
class KillClusterdStorage (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Kill storage clusterd while it is running inside the enviromentd container. The process orchestrator will (try to) start it again.
Expand source code Browse git
class KillClusterdStorage(Scenario): """Kill storage clusterd while it is running inside the enviromentd container. The process orchestrator will (try to) start it again.""" def actions(self) -> list[Action]: return [ StartMz(self), StartClusterdCompute(), UseClusterdCompute(self), Initialize(self), KillClusterdStorageAction(), Manipulate(self, phase=1), KillClusterdStorageAction(), Manipulate(self, phase=2), KillClusterdStorageAction(), Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), StartClusterdCompute(), UseClusterdCompute(self), Initialize(self), KillClusterdStorageAction(), Manipulate(self, phase=1), KillClusterdStorageAction(), Manipulate(self, phase=2), KillClusterdStorageAction(), Validate(self), ]
class NoRestartNoUpgrade (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Expand source code Browse git
class NoRestartNoUpgrade(Scenario): def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), Manipulate(self, phase=1), Manipulate(self, phase=2), Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), Manipulate(self, phase=1), Manipulate(self, phase=2), Validate(self), ]
class RestartClusterdCompute (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Restart clusterd by having it run in a separate container that is then killed and restarted.
Expand source code Browse git
class RestartClusterdCompute(Scenario): """Restart clusterd by having it run in a separate container that is then killed and restarted.""" def actions(self) -> list[Action]: return [ StartMz(self), StartClusterdCompute(), UseClusterdCompute(self), Initialize(self), KillClusterdCompute(), StartClusterdCompute(), Manipulate(self, phase=1), KillClusterdCompute(), StartClusterdCompute(), Manipulate(self, phase=2), KillClusterdCompute(), StartClusterdCompute(), Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), StartClusterdCompute(), UseClusterdCompute(self), Initialize(self), KillClusterdCompute(), StartClusterdCompute(), Manipulate(self, phase=1), KillClusterdCompute(), StartClusterdCompute(), Manipulate(self, phase=2), KillClusterdCompute(), StartClusterdCompute(), Validate(self), ]
class RestartCockroach (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Expand source code Browse git
class RestartCockroach(Scenario): def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), RestartCockroachAction(), Manipulate(self, phase=1), RestartCockroachAction(), Manipulate(self, phase=2), RestartCockroachAction(), Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), RestartCockroachAction(), Manipulate(self, phase=1), RestartCockroachAction(), Manipulate(self, phase=2), RestartCockroachAction(), Validate(self), ]
class RestartEntireMz (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Expand source code Browse git
class RestartEntireMz(Scenario): def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), KillMz(), StartMz(self), Manipulate(self, phase=1), KillMz(), StartMz(self), Manipulate(self, phase=2), KillMz(), StartMz(self), Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), KillMz(), StartMz(self), Manipulate(self, phase=1), KillMz(), StartMz(self), Manipulate(self, phase=2), KillMz(), StartMz(self), Validate(self), ]
class RestartEnvironmentdClusterdStorage (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Restart environmentd and storage clusterds (as spawned from it), while keeping computed running by placing it in a separate container.
Expand source code Browse git
class RestartEnvironmentdClusterdStorage(Scenario): """Restart environmentd and storage clusterds (as spawned from it), while keeping computed running by placing it in a separate container.""" def actions(self) -> list[Action]: return [ StartMz(self), StartClusterdCompute(), UseClusterdCompute(self), Initialize(self), KillMz(), StartMz(self), Manipulate(self, phase=1), KillMz(), StartMz(self), Manipulate(self, phase=2), KillMz(), StartMz(self), Validate(self), # Validate again so that introducing non-idempotent validate()s # will cause the CI to fail. Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), StartClusterdCompute(), UseClusterdCompute(self), Initialize(self), KillMz(), StartMz(self), Manipulate(self, phase=1), KillMz(), StartMz(self), Manipulate(self, phase=2), KillMz(), StartMz(self), Validate(self), # Validate again so that introducing non-idempotent validate()s # will cause the CI to fail. Validate(self), ]
class RestartRedpandaDebezium (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Expand source code Browse git
class RestartRedpandaDebezium(Scenario): def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), RestartRedpandaDebeziumAction(), Manipulate(self, phase=1), RestartRedpandaDebeziumAction(), Manipulate(self, phase=2), RestartRedpandaDebeziumAction(), Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), RestartRedpandaDebeziumAction(), Manipulate(self, phase=1), RestartRedpandaDebeziumAction(), Manipulate(self, phase=2), RestartRedpandaDebeziumAction(), Validate(self), ]
class RestartSourcePostgres (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Expand source code Browse git
class RestartSourcePostgres(Scenario): def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), RestartSourcePostgresAction(), Manipulate(self, phase=1), RestartSourcePostgresAction(), Manipulate(self, phase=2), RestartSourcePostgresAction(), Validate(self), ]
Ancestors
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: return [ StartMz(self), Initialize(self), RestartSourcePostgresAction(), Manipulate(self, phase=1), RestartSourcePostgresAction(), Manipulate(self, phase=2), RestartSourcePostgresAction(), Validate(self), ]
class Scenario (checks: list[type[Check]], executor: Executor, seed: str | None = None)
-
Expand source code Browse git
class Scenario: def __init__( self, checks: list[type[Check]], executor: Executor, seed: str | None = None ) -> None: self._checks = checks self.executor = executor self.rng = None if seed is None else Random(seed) self._base_version = MzVersion.parse_cargo() filtered_check_classes = [] for check_class in self.checks(): if self._include_check_class(check_class): filtered_check_classes.append(check_class) # Use base_version() here instead of _base_version so that overwriting # upgrade scenarios can specify another base version. self.check_objects = [ check_class(self.base_version(), self.rng) for check_class in filtered_check_classes ] def checks(self) -> list[type[Check]]: if self.rng: self.rng.shuffle(self._checks) return self._checks def actions(self) -> list[Action]: assert False def base_version(self) -> MzVersion: return self._base_version def run(self) -> None: actions = self.actions() # Configure implicitly for cloud scenarios if not isinstance(actions[0], StartMz): actions.insert(0, ConfigureMz(self)) for index, action in enumerate(actions): # Implicitly call configure to raise version-dependent limits if isinstance(action, StartMz) and not any( env.startswith("MZ_DEPLOY_GENERATION=") for env in action.environment_extra ): actions.insert( index + 1, ConfigureMz(self, mz_service=action.mz_service) ) elif isinstance(action, ReplaceEnvironmentdStatefulSet): actions.insert(index + 1, ConfigureMz(self)) for action in actions: action.execute(self.executor) action.join(self.executor) def requires_external_idempotence(self) -> bool: return False def _include_check_class(self, check_class: type[Check]) -> bool: return not check_class.__name__.endswith("Base") and ( not self.requires_external_idempotence() or check_class.externally_idempotent )
Subclasses
- DropCreateDefaultReplica
- KillClusterdStorage
- NoRestartNoUpgrade
- RestartClusterdCompute
- RestartCockroach
- RestartEntireMz
- RestartEnvironmentdClusterdStorage
- RestartRedpandaDebezium
- RestartSourcePostgres
- BackupAndRestoreAfterManipulate
- BackupAndRestoreBeforeManipulate
- BackupAndRestoreMulti
- BackupAndRestoreToPreviousState
- MigrateAarch64X86
- MigrateX86Aarch64
- PersistTxnFencing
- PersistTxnToggle
- PersistTxnFencing
- PersistTxnToggle
- PreflightCheckContinue
- PreflightCheckRollback
- UpgradeClusterdComputeFirst
- UpgradeClusterdComputeLast
- UpgradeEntireMz
- UpgradeEntireMzFourVersions
- UpgradeEntireMzTwoVersions
Methods
def actions(self) ‑> list[Action]
-
Expand source code Browse git
def actions(self) -> list[Action]: assert False
def base_version(self) ‑> MzVersion
-
Expand source code Browse git
def base_version(self) -> MzVersion: return self._base_version
def checks(self) ‑> list[type[Check]]
-
Expand source code Browse git
def checks(self) -> list[type[Check]]: if self.rng: self.rng.shuffle(self._checks) return self._checks
def requires_external_idempotence(self) ‑> bool
-
Expand source code Browse git
def requires_external_idempotence(self) -> bool: return False
def run(self) ‑> None
-
Expand source code Browse git
def run(self) -> None: actions = self.actions() # Configure implicitly for cloud scenarios if not isinstance(actions[0], StartMz): actions.insert(0, ConfigureMz(self)) for index, action in enumerate(actions): # Implicitly call configure to raise version-dependent limits if isinstance(action, StartMz) and not any( env.startswith("MZ_DEPLOY_GENERATION=") for env in action.environment_extra ): actions.insert( index + 1, ConfigureMz(self, mz_service=action.mz_service) ) elif isinstance(action, ReplaceEnvironmentdStatefulSet): actions.insert(index + 1, ConfigureMz(self)) for action in actions: action.execute(self.executor) action.join(self.executor)