Module materialize.zippy.source_actions

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
from textwrap import dedent

from materialize.mzcompose.composition import Composition
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.framework import Action, ActionFactory, Capabilities, Capability
from materialize.zippy.kafka_capabilities import Envelope, KafkaRunning, TopicExists
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.replica_capabilities import source_capable_clusters
from materialize.zippy.source_capabilities import SourceExists
from materialize.zippy.storaged_capabilities import StoragedRunning


class CreateSourceParameterized(ActionFactory):
    """Creates a source in Materialized."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {
            BalancerdIsRunning,
            MzIsRunning,
            StoragedRunning,
            KafkaRunning,
            TopicExists,
        }

    def __init__(self, max_sources: int = 10) -> None:
        self.max_sources = max_sources

    def new(self, capabilities: Capabilities) -> list[Action]:
        new_source_name = capabilities.get_free_capability_name(
            SourceExists, self.max_sources
        )

        if new_source_name:
            return [
                CreateSource(
                    capabilities=capabilities,
                    source=SourceExists(
                        name=new_source_name,
                        topic=random.choice(capabilities.get(TopicExists)),
                        cluster_name=random.choice(
                            source_capable_clusters(capabilities)
                        ),
                        uses_ssh_tunnel=random.choice([True, False]),
                    ),
                )
            ]
        else:
            return []


class AlterSourceConnectionParameterized(ActionFactory):
    """Alters a source in Materialized."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, StoragedRunning, KafkaRunning, TopicExists, SourceExists}

    def new(self, capabilities: Capabilities) -> list[Action]:
        existing_source_exists = capabilities.get(
            SourceExists,
        )

        return [
            AlterSourceConnection(
                capabilities=capabilities,
                source=source_exists,
            )
            for source_exists in existing_source_exists
        ]


class CreateSource(Action):
    def __init__(self, capabilities: Capabilities, source: SourceExists) -> None:
        self.source = source
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        envelope = str(self.source.topic.envelope).split(".")[1]
        kafka_connection_name = f"{self.source.name}_kafka_conn"
        c.testdrive(
            dedent(
                f"""
                > CREATE CONNECTION IF NOT EXISTS {self.source.name}_csr_conn
                  TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');

                > CREATE CONNECTION IF NOT EXISTS {kafka_connection_name}
                  TO KAFKA (BROKER '${{testdrive.kafka-addr}}' {'USING SSH TUNNEL zippy_ssh' if self.source.uses_ssh_tunnel else ''}, SECURITY PROTOCOL PLAINTEXT);

                > CREATE SOURCE {self.source.name}
                  IN CLUSTER {self.source.cluster_name}
                  FROM KAFKA CONNECTION {kafka_connection_name}
                  (TOPIC 'testdrive-{self.source.topic.name}-${{testdrive.seed}}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.source.name}_csr_conn
                  ENVELOPE {envelope}
                """
            )
        )

    def provides(self) -> list[Capability]:
        return [self.source]


class AlterSourceConnection(Action):
    def __init__(self, capabilities: Capabilities, source: SourceExists) -> None:
        self.source = source
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        # This flips the usage of the SSH tunnel.
        self.flip_usage_of_ssh_tunnel(
            c, new_use_ssh_status=not self.source.uses_ssh_tunnel
        )
        self.source.uses_ssh_tunnel = not self.source.uses_ssh_tunnel

    def flip_usage_of_ssh_tunnel(
        self, c: Composition, new_use_ssh_status: bool
    ) -> None:
        kafka_connection_name = f"{self.source.name}_kafka_conn"

        if self.source.topic.envelope == Envelope.UPSERT:
            # TODO: #25417 (alter upsert source)
            return

        c.testdrive(
            dedent(
                f"""
                > ALTER CONNECTION {kafka_connection_name} SET (BROKER '${{testdrive.kafka-addr}}'
                  {'USING SSH TUNNEL zippy_ssh' if new_use_ssh_status else ''});
                """
            )
        )

    def provides(self) -> list[Capability]:
        return []

Classes

class AlterSourceConnection (capabilities: Capabilities, source: SourceExists)

Base class for an action that a Zippy test can take.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class AlterSourceConnection(Action):
    def __init__(self, capabilities: Capabilities, source: SourceExists) -> None:
        self.source = source
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        # This flips the usage of the SSH tunnel.
        self.flip_usage_of_ssh_tunnel(
            c, new_use_ssh_status=not self.source.uses_ssh_tunnel
        )
        self.source.uses_ssh_tunnel = not self.source.uses_ssh_tunnel

    def flip_usage_of_ssh_tunnel(
        self, c: Composition, new_use_ssh_status: bool
    ) -> None:
        kafka_connection_name = f"{self.source.name}_kafka_conn"

        if self.source.topic.envelope == Envelope.UPSERT:
            # TODO: #25417 (alter upsert source)
            return

        c.testdrive(
            dedent(
                f"""
                > ALTER CONNECTION {kafka_connection_name} SET (BROKER '${{testdrive.kafka-addr}}'
                  {'USING SSH TUNNEL zippy_ssh' if new_use_ssh_status else ''});
                """
            )
        )

    def provides(self) -> list[Capability]:
        return []

Ancestors

Methods

def flip_usage_of_ssh_tunnel(self, c: Composition, new_use_ssh_status: bool) ‑> None
Expand source code Browse git
def flip_usage_of_ssh_tunnel(
    self, c: Composition, new_use_ssh_status: bool
) -> None:
    kafka_connection_name = f"{self.source.name}_kafka_conn"

    if self.source.topic.envelope == Envelope.UPSERT:
        # TODO: #25417 (alter upsert source)
        return

    c.testdrive(
        dedent(
            f"""
            > ALTER CONNECTION {kafka_connection_name} SET (BROKER '${{testdrive.kafka-addr}}'
              {'USING SSH TUNNEL zippy_ssh' if new_use_ssh_status else ''});
            """
        )
    )

Inherited members

class AlterSourceConnectionParameterized

Alters a source in Materialized.

Expand source code Browse git
class AlterSourceConnectionParameterized(ActionFactory):
    """Alters a source in Materialized."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, StoragedRunning, KafkaRunning, TopicExists, SourceExists}

    def new(self, capabilities: Capabilities) -> list[Action]:
        existing_source_exists = capabilities.get(
            SourceExists,
        )

        return [
            AlterSourceConnection(
                capabilities=capabilities,
                source=source_exists,
            )
            for source_exists in existing_source_exists
        ]

Ancestors

Methods

def new(self, capabilities: Capabilities) ‑> list[Action]
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]:
    existing_source_exists = capabilities.get(
        SourceExists,
    )

    return [
        AlterSourceConnection(
            capabilities=capabilities,
            source=source_exists,
        )
        for source_exists in existing_source_exists
    ]

Inherited members

class CreateSource (capabilities: Capabilities, source: SourceExists)

Base class for an action that a Zippy test can take.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class CreateSource(Action):
    def __init__(self, capabilities: Capabilities, source: SourceExists) -> None:
        self.source = source
        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        envelope = str(self.source.topic.envelope).split(".")[1]
        kafka_connection_name = f"{self.source.name}_kafka_conn"
        c.testdrive(
            dedent(
                f"""
                > CREATE CONNECTION IF NOT EXISTS {self.source.name}_csr_conn
                  TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');

                > CREATE CONNECTION IF NOT EXISTS {kafka_connection_name}
                  TO KAFKA (BROKER '${{testdrive.kafka-addr}}' {'USING SSH TUNNEL zippy_ssh' if self.source.uses_ssh_tunnel else ''}, SECURITY PROTOCOL PLAINTEXT);

                > CREATE SOURCE {self.source.name}
                  IN CLUSTER {self.source.cluster_name}
                  FROM KAFKA CONNECTION {kafka_connection_name}
                  (TOPIC 'testdrive-{self.source.topic.name}-${{testdrive.seed}}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.source.name}_csr_conn
                  ENVELOPE {envelope}
                """
            )
        )

    def provides(self) -> list[Capability]:
        return [self.source]

Ancestors

Inherited members

class CreateSourceParameterized (max_sources: int = 10)

Creates a source in Materialized.

Expand source code Browse git
class CreateSourceParameterized(ActionFactory):
    """Creates a source in Materialized."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {
            BalancerdIsRunning,
            MzIsRunning,
            StoragedRunning,
            KafkaRunning,
            TopicExists,
        }

    def __init__(self, max_sources: int = 10) -> None:
        self.max_sources = max_sources

    def new(self, capabilities: Capabilities) -> list[Action]:
        new_source_name = capabilities.get_free_capability_name(
            SourceExists, self.max_sources
        )

        if new_source_name:
            return [
                CreateSource(
                    capabilities=capabilities,
                    source=SourceExists(
                        name=new_source_name,
                        topic=random.choice(capabilities.get(TopicExists)),
                        cluster_name=random.choice(
                            source_capable_clusters(capabilities)
                        ),
                        uses_ssh_tunnel=random.choice([True, False]),
                    ),
                )
            ]
        else:
            return []

Ancestors

Methods

def new(self, capabilities: Capabilities) ‑> list[Action]
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]:
    new_source_name = capabilities.get_free_capability_name(
        SourceExists, self.max_sources
    )

    if new_source_name:
        return [
            CreateSource(
                capabilities=capabilities,
                source=SourceExists(
                    name=new_source_name,
                    topic=random.choice(capabilities.get(TopicExists)),
                    cluster_name=random.choice(
                        source_capable_clusters(capabilities)
                    ),
                    uses_ssh_tunnel=random.choice([True, False]),
                ),
            )
        ]
    else:
        return []

Inherited members