Module materialize.zippy.replica_actions

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
from textwrap import dedent

from materialize.mzcompose.composition import Composition
from materialize.zippy.framework import Action, Capabilities, Capability
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.replica_capabilities import ReplicaExists, ReplicaSizeType


class DropDefaultReplica(Action):
    """Drops the default replica."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def run(self, c: Composition) -> None:
        # Default cluster is not owned by materialize, thus can't be dropped by
        # it if enable_rbac_checks is on.
        c.testdrive(
            dedent(
                """
            $ postgres-execute connection=postgres://mz_system:materialize@materialized:6877
            ALTER CLUSTER quickstart SET (MANAGED = false)
            DROP CLUSTER REPLICA quickstart.r1
            """
            )
        )


class CreateReplica(Action):
    """Creates a replica on the quickstart cluster."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def __init__(self, capabilities: Capabilities) -> None:
        this_replica = ReplicaExists(name="replica" + str(random.randint(1, 4)))

        existing_replicas = [
            t for t in capabilities.get(ReplicaExists) if t.name == this_replica.name
        ]

        if len(existing_replicas) == 0:
            self.new_replica = True

            size_types = [
                ReplicaSizeType.Nodes,
                ReplicaSizeType.Workers,
                ReplicaSizeType.Both,
            ]
            size_type = random.choice(size_types)

            size = str(random.choice([2, 4]))
            if size_type is ReplicaSizeType.Nodes:
                this_replica.size = size + "-1"
            elif size_type is ReplicaSizeType.Workers:
                this_replica.size = size
            elif size_type is ReplicaSizeType.Both:
                this_replica.size = f"{size}-{size}"
            else:
                assert False

            if this_replica.size == "1-1":
                this_replica.size = "1"

            self.replica = this_replica
        elif len(existing_replicas) == 1:
            self.new_replica = False
            self.replica = existing_replicas[0]
        else:
            assert False

        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        if self.new_replica:
            # Default cluster is not owned by materialize, thus can't have a replica
            # added if enable_rbac_checks is on.
            c.testdrive(
                dedent(
                    f"""
                $ postgres-execute connection=postgres://mz_system:materialize@materialized:6877
                CREATE CLUSTER REPLICA quickstart.{self.replica.name} SIZE '{self.replica.size}'
                """
                )
            )

    def provides(self) -> list[Capability]:
        return [self.replica] if self.new_replica else []


class DropReplica(Action):
    """Drops a replica from the quickstart cluster."""

    replica: ReplicaExists | None

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, ReplicaExists}

    def __init__(self, capabilities: Capabilities) -> None:
        existing_replicas = capabilities.get(ReplicaExists)

        if len(existing_replicas) > 1:
            self.replica = random.choice(existing_replicas)
            capabilities.remove_capability_instance(self.replica)
        else:
            self.replica = None

        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        if self.replica is not None:
            # Default cluster is not owned by materialize, thus can't have a replica
            # removed if enable_rbac_checks is on.
            c.testdrive(
                dedent(
                    f"""
                $ postgres-execute connection=postgres://mz_system:materialize@materialized:6877
                DROP CLUSTER REPLICA IF EXISTS quickstart.{self.replica.name}
                """
                )
            )

Classes

class CreateReplica (capabilities: Capabilities)

Creates a replica on the quickstart cluster.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class CreateReplica(Action):
    """Creates a replica on the quickstart cluster."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def __init__(self, capabilities: Capabilities) -> None:
        this_replica = ReplicaExists(name="replica" + str(random.randint(1, 4)))

        existing_replicas = [
            t for t in capabilities.get(ReplicaExists) if t.name == this_replica.name
        ]

        if len(existing_replicas) == 0:
            self.new_replica = True

            size_types = [
                ReplicaSizeType.Nodes,
                ReplicaSizeType.Workers,
                ReplicaSizeType.Both,
            ]
            size_type = random.choice(size_types)

            size = str(random.choice([2, 4]))
            if size_type is ReplicaSizeType.Nodes:
                this_replica.size = size + "-1"
            elif size_type is ReplicaSizeType.Workers:
                this_replica.size = size
            elif size_type is ReplicaSizeType.Both:
                this_replica.size = f"{size}-{size}"
            else:
                assert False

            if this_replica.size == "1-1":
                this_replica.size = "1"

            self.replica = this_replica
        elif len(existing_replicas) == 1:
            self.new_replica = False
            self.replica = existing_replicas[0]
        else:
            assert False

        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        if self.new_replica:
            # Default cluster is not owned by materialize, thus can't have a replica
            # added if enable_rbac_checks is on.
            c.testdrive(
                dedent(
                    f"""
                $ postgres-execute connection=postgres://mz_system:materialize@materialized:6877
                CREATE CLUSTER REPLICA quickstart.{self.replica.name} SIZE '{self.replica.size}'
                """
                )
            )

    def provides(self) -> list[Capability]:
        return [self.replica] if self.new_replica else []

Ancestors

Inherited members

class DropDefaultReplica (capabilities: Capabilities)

Drops the default replica.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class DropDefaultReplica(Action):
    """Drops the default replica."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning}

    def run(self, c: Composition) -> None:
        # Default cluster is not owned by materialize, thus can't be dropped by
        # it if enable_rbac_checks is on.
        c.testdrive(
            dedent(
                """
            $ postgres-execute connection=postgres://mz_system:materialize@materialized:6877
            ALTER CLUSTER quickstart SET (MANAGED = false)
            DROP CLUSTER REPLICA quickstart.r1
            """
            )
        )

Ancestors

Inherited members

class DropReplica (capabilities: Capabilities)

Drops a replica from the quickstart cluster.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class DropReplica(Action):
    """Drops a replica from the quickstart cluster."""

    replica: ReplicaExists | None

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, ReplicaExists}

    def __init__(self, capabilities: Capabilities) -> None:
        existing_replicas = capabilities.get(ReplicaExists)

        if len(existing_replicas) > 1:
            self.replica = random.choice(existing_replicas)
            capabilities.remove_capability_instance(self.replica)
        else:
            self.replica = None

        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        if self.replica is not None:
            # Default cluster is not owned by materialize, thus can't have a replica
            # removed if enable_rbac_checks is on.
            c.testdrive(
                dedent(
                    f"""
                $ postgres-execute connection=postgres://mz_system:materialize@materialized:6877
                DROP CLUSTER REPLICA IF EXISTS quickstart.{self.replica.name}
                """
                )
            )

Ancestors

Class variables

var replicaReplicaExists | None

Inherited members