Module materialize.zippy.kafka_actions

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
import string
import threading
from textwrap import dedent

import numpy as np

from materialize.mzcompose.composition import Composition
from materialize.zippy.framework import Action, ActionFactory, Capabilities, Capability
from materialize.zippy.kafka_capabilities import Envelope, KafkaRunning, TopicExists
from materialize.zippy.mz_capabilities import MzIsRunning

SCHEMA = """
$ set keyschema={
        "type" : "record",
        "name" : "test",
        "fields" : [
            {"name":"key", "type":"long"}
        ]
    }

$ set schema={
        "type" : "record",
        "name" : "test",
        "fields" : [
            {"name":"f1", "type":"long"},
            {"name":"pad", "type":"string"}
        ]
    }
"""


class KafkaStart(Action):
    """Start a Kafka instance."""

    def provides(self) -> list[Capability]:
        return [KafkaRunning()]

    def run(self, c: Composition) -> None:
        c.up("redpanda")


class KafkaStop(Action):
    """Stop the Kafka instance."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {KafkaRunning}

    def withholds(self) -> set[type[Capability]]:
        return {KafkaRunning}

    def run(self, c: Composition) -> None:
        c.kill("redpanda")


class CreateTopicParameterized(ActionFactory):
    """Creates a Kafka topic and decides on the envelope that will be used."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, KafkaRunning}

    def __init__(
        self,
        max_topics: int = 10,
        envelopes_with_weights: dict[Envelope, int] = {
            Envelope.NONE: 25,
            Envelope.UPSERT: 75,
        },
    ) -> None:
        self.max_topics = max_topics
        self.envelopes_with_weights = envelopes_with_weights

    def new(self, capabilities: Capabilities) -> list[Action]:
        new_topic_name = capabilities.get_free_capability_name(
            TopicExists, self.max_topics
        )

        if new_topic_name:
            return [
                CreateTopic(
                    capabilities=capabilities,
                    topic=TopicExists(
                        name=new_topic_name,
                        envelope=random.choices(
                            list(self.envelopes_with_weights.keys()),
                            weights=list(self.envelopes_with_weights.values()),
                        )[0],
                        partitions=random.randint(1, 10),
                    ),
                )
            ]
        else:
            return []


class CreateTopic(Action):
    def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
        self.topic = topic
        super().__init__(capabilities)

    def provides(self) -> list[Capability]:
        return [self.topic]

    def run(self, c: Composition) -> None:
        c.testdrive(
            SCHEMA
            + dedent(
                f"""
                $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
                $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
                {{"key": 0}} {{"f1": 0, "pad": ""}}
                """
            )
        )


class Ingest(Action):
    """Ingests data (inserts, updates or deletions) into a Kafka topic."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, KafkaRunning, TopicExists}

    def __init__(self, capabilities: Capabilities) -> None:
        self.topic = random.choice(capabilities.get(TopicExists))
        self.delta = random.randint(1, 10000)
        # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
        self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
            string.ascii_letters
        )
        super().__init__(capabilities)

    def __str__(self) -> str:
        return f"{Action.__str__(self)} {self.topic.name}"


class KafkaInsert(Ingest):
    """Inserts data into a Kafka topic."""

    def parallel(self) -> bool:
        return False

    def run(self, c: Composition) -> None:
        prev_max = self.topic.watermarks.max
        self.topic.watermarks.max = prev_max + self.delta
        assert self.topic.watermarks.max >= 0
        assert self.topic.watermarks.min >= 0

        testdrive_str = SCHEMA + dedent(
            f"""
            $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
            {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
            """
        )

        if self.parallel():
            threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
        else:
            c.testdrive(testdrive_str)


class KafkaInsertParallel(KafkaInsert):
    """Inserts data into a Kafka topic using background threads."""

    @classmethod
    def require_explicit_mention(cls) -> bool:
        return True

    def parallel(self) -> bool:
        return True


class KafkaUpsertFromHead(Ingest):
    """Updates records from the head in-place by modifying their pad"""

    def run(self, c: Composition) -> None:
        if self.topic.envelope is Envelope.NONE:
            return

        head = self.topic.watermarks.max
        start = max(head - self.delta, self.topic.watermarks.min)
        actual_delta = head - start

        if actual_delta > 0:
            c.testdrive(
                SCHEMA
                + dedent(
                    f"""
                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
                    """
                )
            )


class KafkaDeleteFromHead(Ingest):
    """Deletes the largest values previously inserted."""

    def run(self, c: Composition) -> None:
        if self.topic.envelope is Envelope.NONE:
            return

        prev_max = self.topic.watermarks.max
        self.topic.watermarks.max = max(
            prev_max - self.delta, self.topic.watermarks.min
        )
        assert self.topic.watermarks.max >= 0
        assert self.topic.watermarks.min >= 0

        actual_delta = prev_max - self.topic.watermarks.max

        if actual_delta > 0:
            c.testdrive(
                SCHEMA
                + dedent(
                    f"""
                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
                    {{"key": ${{kafka-ingest.iteration}}}}
                    """
                )
            )


class KafkaUpsertFromTail(Ingest):
    """Updates records from the tail in-place by modifying their pad"""

    def run(self, c: Composition) -> None:
        if self.topic.envelope is Envelope.NONE:
            return

        tail = self.topic.watermarks.min
        end = min(tail + self.delta, self.topic.watermarks.max)
        actual_delta = end - tail

        if actual_delta > 0:
            c.testdrive(
                SCHEMA
                + dedent(
                    f"""
                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
                    """
                )
            )


class KafkaDeleteFromTail(Ingest):
    """Deletes the smallest values previously inserted."""

    def run(self, c: Composition) -> None:
        if self.topic.envelope is Envelope.NONE:
            return

        prev_min = self.topic.watermarks.min
        self.topic.watermarks.min = min(
            prev_min + self.delta, self.topic.watermarks.max
        )
        assert self.topic.watermarks.max >= 0
        assert self.topic.watermarks.min >= 0
        actual_delta = self.topic.watermarks.min - prev_min

        if actual_delta > 0:
            c.testdrive(
                SCHEMA
                + dedent(
                    f"""
                   $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
                   {{"key": ${{kafka-ingest.iteration}}}}
                   """
                )
            )

Classes

class CreateTopic (capabilities: Capabilities, topic: TopicExists)

Base class for an action that a Zippy test can take.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class CreateTopic(Action):
    def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
        self.topic = topic
        super().__init__(capabilities)

    def provides(self) -> list[Capability]:
        return [self.topic]

    def run(self, c: Composition) -> None:
        c.testdrive(
            SCHEMA
            + dedent(
                f"""
                $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
                $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
                {{"key": 0}} {{"f1": 0, "pad": ""}}
                """
            )
        )

Ancestors

Inherited members

class CreateTopicParameterized (max_topics: int = 10, envelopes_with_weights: dict[Envelope, int] = {<Envelope.NONE: 1>: 25, <Envelope.UPSERT: 2>: 75})

Creates a Kafka topic and decides on the envelope that will be used.

Expand source code Browse git
class CreateTopicParameterized(ActionFactory):
    """Creates a Kafka topic and decides on the envelope that will be used."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, KafkaRunning}

    def __init__(
        self,
        max_topics: int = 10,
        envelopes_with_weights: dict[Envelope, int] = {
            Envelope.NONE: 25,
            Envelope.UPSERT: 75,
        },
    ) -> None:
        self.max_topics = max_topics
        self.envelopes_with_weights = envelopes_with_weights

    def new(self, capabilities: Capabilities) -> list[Action]:
        new_topic_name = capabilities.get_free_capability_name(
            TopicExists, self.max_topics
        )

        if new_topic_name:
            return [
                CreateTopic(
                    capabilities=capabilities,
                    topic=TopicExists(
                        name=new_topic_name,
                        envelope=random.choices(
                            list(self.envelopes_with_weights.keys()),
                            weights=list(self.envelopes_with_weights.values()),
                        )[0],
                        partitions=random.randint(1, 10),
                    ),
                )
            ]
        else:
            return []

Ancestors

Methods

def new(self, capabilities: Capabilities) ‑> list[Action]
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]:
    new_topic_name = capabilities.get_free_capability_name(
        TopicExists, self.max_topics
    )

    if new_topic_name:
        return [
            CreateTopic(
                capabilities=capabilities,
                topic=TopicExists(
                    name=new_topic_name,
                    envelope=random.choices(
                        list(self.envelopes_with_weights.keys()),
                        weights=list(self.envelopes_with_weights.values()),
                    )[0],
                    partitions=random.randint(1, 10),
                ),
            )
        ]
    else:
        return []

Inherited members

class Ingest (capabilities: Capabilities)

Ingests data (inserts, updates or deletions) into a Kafka topic.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class Ingest(Action):
    """Ingests data (inserts, updates or deletions) into a Kafka topic."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {MzIsRunning, KafkaRunning, TopicExists}

    def __init__(self, capabilities: Capabilities) -> None:
        self.topic = random.choice(capabilities.get(TopicExists))
        self.delta = random.randint(1, 10000)
        # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
        self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
            string.ascii_letters
        )
        super().__init__(capabilities)

    def __str__(self) -> str:
        return f"{Action.__str__(self)} {self.topic.name}"

Ancestors

Subclasses

Inherited members

class KafkaDeleteFromHead (capabilities: Capabilities)

Deletes the largest values previously inserted.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KafkaDeleteFromHead(Ingest):
    """Deletes the largest values previously inserted."""

    def run(self, c: Composition) -> None:
        if self.topic.envelope is Envelope.NONE:
            return

        prev_max = self.topic.watermarks.max
        self.topic.watermarks.max = max(
            prev_max - self.delta, self.topic.watermarks.min
        )
        assert self.topic.watermarks.max >= 0
        assert self.topic.watermarks.min >= 0

        actual_delta = prev_max - self.topic.watermarks.max

        if actual_delta > 0:
            c.testdrive(
                SCHEMA
                + dedent(
                    f"""
                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
                    {{"key": ${{kafka-ingest.iteration}}}}
                    """
                )
            )

Ancestors

Inherited members

class KafkaDeleteFromTail (capabilities: Capabilities)

Deletes the smallest values previously inserted.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KafkaDeleteFromTail(Ingest):
    """Deletes the smallest values previously inserted."""

    def run(self, c: Composition) -> None:
        if self.topic.envelope is Envelope.NONE:
            return

        prev_min = self.topic.watermarks.min
        self.topic.watermarks.min = min(
            prev_min + self.delta, self.topic.watermarks.max
        )
        assert self.topic.watermarks.max >= 0
        assert self.topic.watermarks.min >= 0
        actual_delta = self.topic.watermarks.min - prev_min

        if actual_delta > 0:
            c.testdrive(
                SCHEMA
                + dedent(
                    f"""
                   $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
                   {{"key": ${{kafka-ingest.iteration}}}}
                   """
                )
            )

Ancestors

Inherited members

class KafkaInsert (capabilities: Capabilities)

Inserts data into a Kafka topic.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KafkaInsert(Ingest):
    """Inserts data into a Kafka topic."""

    def parallel(self) -> bool:
        return False

    def run(self, c: Composition) -> None:
        prev_max = self.topic.watermarks.max
        self.topic.watermarks.max = prev_max + self.delta
        assert self.topic.watermarks.max >= 0
        assert self.topic.watermarks.min >= 0

        testdrive_str = SCHEMA + dedent(
            f"""
            $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
            {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
            """
        )

        if self.parallel():
            threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
        else:
            c.testdrive(testdrive_str)

Ancestors

Subclasses

Methods

def parallel(self) ‑> bool
Expand source code Browse git
def parallel(self) -> bool:
    return False

Inherited members

class KafkaInsertParallel (capabilities: Capabilities)

Inserts data into a Kafka topic using background threads.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KafkaInsertParallel(KafkaInsert):
    """Inserts data into a Kafka topic using background threads."""

    @classmethod
    def require_explicit_mention(cls) -> bool:
        return True

    def parallel(self) -> bool:
        return True

Ancestors

Methods

def parallel(self) ‑> bool
Expand source code Browse git
def parallel(self) -> bool:
    return True

Inherited members

class KafkaStart (capabilities: Capabilities)

Start a Kafka instance.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KafkaStart(Action):
    """Start a Kafka instance."""

    def provides(self) -> list[Capability]:
        return [KafkaRunning()]

    def run(self, c: Composition) -> None:
        c.up("redpanda")

Ancestors

Inherited members

class KafkaStop (capabilities: Capabilities)

Stop the Kafka instance.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KafkaStop(Action):
    """Stop the Kafka instance."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {KafkaRunning}

    def withholds(self) -> set[type[Capability]]:
        return {KafkaRunning}

    def run(self, c: Composition) -> None:
        c.kill("redpanda")

Ancestors

Inherited members

class KafkaUpsertFromHead (capabilities: Capabilities)

Updates records from the head in-place by modifying their pad

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KafkaUpsertFromHead(Ingest):
    """Updates records from the head in-place by modifying their pad"""

    def run(self, c: Composition) -> None:
        if self.topic.envelope is Envelope.NONE:
            return

        head = self.topic.watermarks.max
        start = max(head - self.delta, self.topic.watermarks.min)
        actual_delta = head - start

        if actual_delta > 0:
            c.testdrive(
                SCHEMA
                + dedent(
                    f"""
                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
                    """
                )
            )

Ancestors

Inherited members

class KafkaUpsertFromTail (capabilities: Capabilities)

Updates records from the tail in-place by modifying their pad

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class KafkaUpsertFromTail(Ingest):
    """Updates records from the tail in-place by modifying their pad"""

    def run(self, c: Composition) -> None:
        if self.topic.envelope is Envelope.NONE:
            return

        tail = self.topic.watermarks.min
        end = min(tail + self.delta, self.topic.watermarks.max)
        actual_delta = end - tail

        if actual_delta > 0:
            c.testdrive(
                SCHEMA
                + dedent(
                    f"""
                    $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
                    {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
                    """
                )
            )

Ancestors

Inherited members