Module materialize.zippy.mz_actions

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.materialized import Materialized
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.crdb_capabilities import CockroachIsRunning
from materialize.zippy.framework import Action, ActionFactory, Capabilities, Capability
from materialize.zippy.minio_capabilities import MinioIsRunning
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.view_capabilities import ViewExists


class MzStartParameterized(ActionFactory):
    """Starts a Mz instance with custom paramters."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {CockroachIsRunning, MinioIsRunning}

    @classmethod
    def incompatible_with(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def __init__(
        self, additional_system_parameter_defaults: dict[str, str] = {}
    ) -> None:
        self.additional_system_parameter_defaults = additional_system_parameter_defaults

    def new(self, capabilities: Capabilities) -> list[Action]:
        return [
            MzStart(
                capabilities=capabilities,
                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
            )
        ]


class MzStart(Action):
    """Starts a Mz instance (all components are running in the same container)."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {CockroachIsRunning, MinioIsRunning}

    @classmethod
    def incompatible_with(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def __init__(
        self,
        capabilities: Capabilities,
        additional_system_parameter_defaults: dict[str, str] = {},
    ) -> None:
        self.additional_system_parameter_defaults = additional_system_parameter_defaults
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        print(
            f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}"
        )

        with c.override(
            Materialized(
                external_minio="toxiproxy",
                external_cockroach="toxiproxy",
                sanity_restart=False,
                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
            )
        ):
            c.up("materialized")

        for config_param in [
            "max_tables",
            "max_sources",
            "max_objects_per_schema",
            "max_materialized_views",
            "max_sinks",
        ]:
            c.sql(
                f"ALTER SYSTEM SET {config_param} TO 1000",
                user="mz_system",
                port=6877,
                print_statement=False,
            )

        c.sql(
            """
            ALTER CLUSTER quickstart SET (MANAGED = false);
            """,
            user="mz_system",
            port=6877,
        )

        # Make sure all eligible LIMIT queries use the PeekPersist optimization
        c.sql(
            "ALTER SYSTEM SET persist_fast_path_limit = 1000000000",
            user="mz_system",
            port=6877,
        )

    def provides(self) -> list[Capability]:
        return [MzIsRunning()]


class MzStop(Action):
    """Stops the entire Mz instance (all components are running in the same container)."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        # Technically speaking, we do not need balancerd to be up in order to kill Mz
        # However, without this protection we frequently end up in a situation where
        # both are down and Zippy enters a prolonged period of restarting one or the
        # other and no other useful work can be performed in the meantime.
        return {MzIsRunning, BalancerdIsRunning}

    def run(self, c: Composition) -> None:
        c.kill("materialized")

    def withholds(self) -> set[type[Capability]]:
        return {MzIsRunning}


class MzRestart(Action):
    """Restarts the entire Mz instance (all components are running in the same container)."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def run(self, c: Composition) -> None:
        c.kill("materialized")
        c.up("materialized")


class KillClusterd(Action):
    """Kills the clusterd processes in the environmentd container. The process orchestrator will restart them."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, ViewExists}

    def run(self, c: Composition) -> None:
        # Depending on the workload, clusterd may not be running, hence the || true
        c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")

Classes

class KillClusterd (capabilities: Capabilities)

Kills the clusterd processes in the environmentd container. The process orchestrator will restart them.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KillClusterd(Action):
    """Kills the clusterd processes in the environmentd container. The process orchestrator will restart them."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, ViewExists}

    def run(self, c: Composition) -> None:
        # Depending on the workload, clusterd may not be running, hence the || true
        c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")

Ancestors

Inherited members

class MzRestart (capabilities: Capabilities)

Restarts the entire Mz instance (all components are running in the same container).

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MzRestart(Action):
    """Restarts the entire Mz instance (all components are running in the same container)."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def run(self, c: Composition) -> None:
        c.kill("materialized")
        c.up("materialized")

Ancestors

Inherited members

class MzStart (capabilities: Capabilities, additional_system_parameter_defaults: dict[str, str] = {})

Starts a Mz instance (all components are running in the same container).

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MzStart(Action):
    """Starts a Mz instance (all components are running in the same container)."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {CockroachIsRunning, MinioIsRunning}

    @classmethod
    def incompatible_with(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def __init__(
        self,
        capabilities: Capabilities,
        additional_system_parameter_defaults: dict[str, str] = {},
    ) -> None:
        self.additional_system_parameter_defaults = additional_system_parameter_defaults
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        print(
            f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}"
        )

        with c.override(
            Materialized(
                external_minio="toxiproxy",
                external_cockroach="toxiproxy",
                sanity_restart=False,
                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
            )
        ):
            c.up("materialized")

        for config_param in [
            "max_tables",
            "max_sources",
            "max_objects_per_schema",
            "max_materialized_views",
            "max_sinks",
        ]:
            c.sql(
                f"ALTER SYSTEM SET {config_param} TO 1000",
                user="mz_system",
                port=6877,
                print_statement=False,
            )

        c.sql(
            """
            ALTER CLUSTER quickstart SET (MANAGED = false);
            """,
            user="mz_system",
            port=6877,
        )

        # Make sure all eligible LIMIT queries use the PeekPersist optimization
        c.sql(
            "ALTER SYSTEM SET persist_fast_path_limit = 1000000000",
            user="mz_system",
            port=6877,
        )

    def provides(self) -> list[Capability]:
        return [MzIsRunning()]

Ancestors

Inherited members

class MzStartParameterized (additional_system_parameter_defaults: dict[str, str] = {})

Starts a Mz instance with custom paramters.

Expand source code Browse git
class MzStartParameterized(ActionFactory):
    """Starts a Mz instance with custom paramters."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {CockroachIsRunning, MinioIsRunning}

    @classmethod
    def incompatible_with(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def __init__(
        self, additional_system_parameter_defaults: dict[str, str] = {}
    ) -> None:
        self.additional_system_parameter_defaults = additional_system_parameter_defaults

    def new(self, capabilities: Capabilities) -> list[Action]:
        return [
            MzStart(
                capabilities=capabilities,
                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
            )
        ]

Ancestors

Methods

def new(self, capabilities: Capabilities) ‑> list[Action]
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]:
    return [
        MzStart(
            capabilities=capabilities,
            additional_system_parameter_defaults=self.additional_system_parameter_defaults,
        )
    ]

Inherited members

class MzStop (capabilities: Capabilities)

Stops the entire Mz instance (all components are running in the same container).

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class MzStop(Action):
    """Stops the entire Mz instance (all components are running in the same container)."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        # Technically speaking, we do not need balancerd to be up in order to kill Mz
        # However, without this protection we frequently end up in a situation where
        # both are down and Zippy enters a prolonged period of restarting one or the
        # other and no other useful work can be performed in the meantime.
        return {MzIsRunning, BalancerdIsRunning}

    def run(self, c: Composition) -> None:
        c.kill("materialized")

    def withholds(self) -> set[type[Capability]]:
        return {MzIsRunning}

Ancestors

Inherited members