Module materialize.checks.mzcompose_actions

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from typing import List, Optional

from materialize.checks.actions import Action
from materialize.checks.executors import Executor
from materialize.mzcompose.services import Clusterd, Materialized


class MzcomposeAction(Action):
    pass


class StartMz(MzcomposeAction):
    def __init__(
        self, tag: Optional[str] = None, environment_extra: List[str] = []
    ) -> None:
        self.tag = tag
        self.environment_extra = environment_extra

    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        image = f"materialize/materialized:{self.tag}" if self.tag is not None else None
        print(f"Starting Mz using image {image}")
        mz = Materialized(
            image=image,
            external_cockroach=True,
            environment_extra=self.environment_extra,
        )

        with c.override(mz):
            c.up("materialized")

        for config_param in ["max_tables", "max_sources"]:
            c.sql(
                f"ALTER SYSTEM SET {config_param} TO 1000",
                user="mz_system",
                port=6877,
            )


class KillMz(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()
        c.kill("materialized")


class UseClusterdCompute(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        c.sql(
            """
            DROP CLUSTER REPLICA default.r1;
            CREATE CLUSTER REPLICA default.r1
                STORAGECTL ADDRESS 'clusterd_compute_1:2100',
                COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'],
                COMPUTE ADDRESSES ['clusterd_compute_1:2102'],
                WORKERS 1;
        """
        )


class KillClusterdCompute(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()
        with c.override(Clusterd(name="clusterd_compute_1")):
            c.kill("clusterd_compute_1")


class StartClusterdCompute(MzcomposeAction):
    def __init__(self, tag: Optional[str] = None) -> None:
        self.tag = tag

    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        clusterd = Clusterd(name="clusterd_compute_1")
        if self.tag:
            # TODO(benesch): remove this conditional once v0.39 ships.
            if any(self.tag.startswith(version) for version in ["v0.37", "v0.38"]):
                clusterd = Clusterd(
                    name="clusterd_compute_1",
                    image=f"materialize/clusterd:{self.tag}",
                    options=[
                        "--compute-controller-listen-addr=0.0.0.0:2101",
                        "--secrets-reader=process",
                        "--secrets-reader-process-dir=/mzdata/secrets",
                    ],
                    storage_workers=None,
                )
            else:
                clusterd = Clusterd(
                    name="clusterd_compute_1",
                    image=f"materialize/clusterd:{self.tag}",
                )
        print(f"Starting Compute using image {clusterd.config.get('image')}")

        with c.override(clusterd):
            c.up("clusterd_compute_1")


class RestartRedpandaDebezium(MzcomposeAction):
    """Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together."""

    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        for service in ["redpanda", "debezium"]:
            c.kill(service)
            c.up(service)


class RestartCockroach(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        c.kill("cockroach")
        c.up("cockroach")


class RestartSourcePostgres(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        c.kill("postgres")
        c.up("postgres")


class KillClusterdStorage(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        # Depending on the workload, clusterd may not be running, hence the || true
        c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")


class DropCreateDefaultReplica(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        c.sql(
            """
           DROP CLUSTER REPLICA default.r1;
           CREATE CLUSTER REPLICA default.r1 SIZE '1';
        """
        )

Classes

class DropCreateDefaultReplica
Expand source code Browse git
class DropCreateDefaultReplica(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        c.sql(
            """
           DROP CLUSTER REPLICA default.r1;
           CREATE CLUSTER REPLICA default.r1 SIZE '1';
        """
        )

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()

    c.sql(
        """
       DROP CLUSTER REPLICA default.r1;
       CREATE CLUSTER REPLICA default.r1 SIZE '1';
    """
    )
class KillClusterdCompute
Expand source code Browse git
class KillClusterdCompute(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()
        with c.override(Clusterd(name="clusterd_compute_1")):
            c.kill("clusterd_compute_1")

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()
    with c.override(Clusterd(name="clusterd_compute_1")):
        c.kill("clusterd_compute_1")
class KillClusterdStorage
Expand source code Browse git
class KillClusterdStorage(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        # Depending on the workload, clusterd may not be running, hence the || true
        c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()

    # Depending on the workload, clusterd may not be running, hence the || true
    c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")
class KillMz
Expand source code Browse git
class KillMz(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()
        c.kill("materialized")

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()
    c.kill("materialized")
class MzcomposeAction
Expand source code Browse git
class MzcomposeAction(Action):
    pass

Ancestors

Subclasses

class RestartCockroach
Expand source code Browse git
class RestartCockroach(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        c.kill("cockroach")
        c.up("cockroach")

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()

    c.kill("cockroach")
    c.up("cockroach")
class RestartRedpandaDebezium

Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together.

Expand source code Browse git
class RestartRedpandaDebezium(MzcomposeAction):
    """Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together."""

    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        for service in ["redpanda", "debezium"]:
            c.kill(service)
            c.up(service)

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()

    for service in ["redpanda", "debezium"]:
        c.kill(service)
        c.up(service)
class RestartSourcePostgres
Expand source code Browse git
class RestartSourcePostgres(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        c.kill("postgres")
        c.up("postgres")

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()

    c.kill("postgres")
    c.up("postgres")
class StartClusterdCompute (tag: Optional[str] = None)
Expand source code Browse git
class StartClusterdCompute(MzcomposeAction):
    def __init__(self, tag: Optional[str] = None) -> None:
        self.tag = tag

    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        clusterd = Clusterd(name="clusterd_compute_1")
        if self.tag:
            # TODO(benesch): remove this conditional once v0.39 ships.
            if any(self.tag.startswith(version) for version in ["v0.37", "v0.38"]):
                clusterd = Clusterd(
                    name="clusterd_compute_1",
                    image=f"materialize/clusterd:{self.tag}",
                    options=[
                        "--compute-controller-listen-addr=0.0.0.0:2101",
                        "--secrets-reader=process",
                        "--secrets-reader-process-dir=/mzdata/secrets",
                    ],
                    storage_workers=None,
                )
            else:
                clusterd = Clusterd(
                    name="clusterd_compute_1",
                    image=f"materialize/clusterd:{self.tag}",
                )
        print(f"Starting Compute using image {clusterd.config.get('image')}")

        with c.override(clusterd):
            c.up("clusterd_compute_1")

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()

    clusterd = Clusterd(name="clusterd_compute_1")
    if self.tag:
        # TODO(benesch): remove this conditional once v0.39 ships.
        if any(self.tag.startswith(version) for version in ["v0.37", "v0.38"]):
            clusterd = Clusterd(
                name="clusterd_compute_1",
                image=f"materialize/clusterd:{self.tag}",
                options=[
                    "--compute-controller-listen-addr=0.0.0.0:2101",
                    "--secrets-reader=process",
                    "--secrets-reader-process-dir=/mzdata/secrets",
                ],
                storage_workers=None,
            )
        else:
            clusterd = Clusterd(
                name="clusterd_compute_1",
                image=f"materialize/clusterd:{self.tag}",
            )
    print(f"Starting Compute using image {clusterd.config.get('image')}")

    with c.override(clusterd):
        c.up("clusterd_compute_1")
class StartMz (tag: Optional[str] = None, environment_extra: List[str] = [])
Expand source code Browse git
class StartMz(MzcomposeAction):
    def __init__(
        self, tag: Optional[str] = None, environment_extra: List[str] = []
    ) -> None:
        self.tag = tag
        self.environment_extra = environment_extra

    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        image = f"materialize/materialized:{self.tag}" if self.tag is not None else None
        print(f"Starting Mz using image {image}")
        mz = Materialized(
            image=image,
            external_cockroach=True,
            environment_extra=self.environment_extra,
        )

        with c.override(mz):
            c.up("materialized")

        for config_param in ["max_tables", "max_sources"]:
            c.sql(
                f"ALTER SYSTEM SET {config_param} TO 1000",
                user="mz_system",
                port=6877,
            )

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()

    image = f"materialize/materialized:{self.tag}" if self.tag is not None else None
    print(f"Starting Mz using image {image}")
    mz = Materialized(
        image=image,
        external_cockroach=True,
        environment_extra=self.environment_extra,
    )

    with c.override(mz):
        c.up("materialized")

    for config_param in ["max_tables", "max_sources"]:
        c.sql(
            f"ALTER SYSTEM SET {config_param} TO 1000",
            user="mz_system",
            port=6877,
        )
class UseClusterdCompute
Expand source code Browse git
class UseClusterdCompute(MzcomposeAction):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()

        c.sql(
            """
            DROP CLUSTER REPLICA default.r1;
            CREATE CLUSTER REPLICA default.r1
                STORAGECTL ADDRESS 'clusterd_compute_1:2100',
                COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'],
                COMPUTE ADDRESSES ['clusterd_compute_1:2102'],
                WORKERS 1;
        """
        )

Ancestors

Methods

def execute(self, e: Executor) ‑> None
Expand source code Browse git
def execute(self, e: Executor) -> None:
    c = e.mzcompose_composition()

    c.sql(
        """
        DROP CLUSTER REPLICA default.r1;
        CREATE CLUSTER REPLICA default.r1
            STORAGECTL ADDRESS 'clusterd_compute_1:2100',
            COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'],
            COMPUTE ADDRESSES ['clusterd_compute_1:2102'],
            WORKERS 1;
    """
    )