Module materialize.zippy.kafka_actions
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
import string
import threading
from textwrap import dedent
import numpy as np
from materialize.mzcompose.composition import Composition
from materialize.zippy.framework import Action, ActionFactory, Capabilities, Capability
from materialize.zippy.kafka_capabilities import Envelope, KafkaRunning, TopicExists
from materialize.zippy.mz_capabilities import MzIsRunning
SCHEMA = """
$ set keyschema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"key", "type":"long"}
]
}
$ set schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"long"},
{"name":"pad", "type":"string"}
]
}
"""
class KafkaStart(Action):
"""Start a Kafka instance."""
def provides(self) -> list[Capability]:
return [KafkaRunning()]
def run(self, c: Composition) -> None:
c.up("redpanda")
class KafkaStop(Action):
"""Stop the Kafka instance."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {KafkaRunning}
def withholds(self) -> set[type[Capability]]:
return {KafkaRunning}
def run(self, c: Composition) -> None:
c.kill("redpanda")
class CreateTopicParameterized(ActionFactory):
"""Creates a Kafka topic and decides on the envelope that will be used."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {MzIsRunning, KafkaRunning}
def __init__(
self,
max_topics: int = 10,
envelopes_with_weights: dict[Envelope, int] = {
Envelope.NONE: 25,
Envelope.UPSERT: 75,
},
) -> None:
self.max_topics = max_topics
self.envelopes_with_weights = envelopes_with_weights
def new(self, capabilities: Capabilities) -> list[Action]:
new_topic_name = capabilities.get_free_capability_name(
TopicExists, self.max_topics
)
if new_topic_name:
return [
CreateTopic(
capabilities=capabilities,
topic=TopicExists(
name=new_topic_name,
envelope=random.choices(
list(self.envelopes_with_weights.keys()),
weights=list(self.envelopes_with_weights.values()),
)[0],
partitions=random.randint(1, 10),
),
)
]
else:
return []
class CreateTopic(Action):
def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None:
self.topic = topic
super().__init__(capabilities)
def provides(self) -> list[Capability]:
return [self.topic]
def run(self, c: Composition) -> None:
c.testdrive(
SCHEMA
+ dedent(
f"""
$ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions}
$ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1
{{"key": 0}} {{"f1": 0, "pad": ""}}
"""
)
)
class Ingest(Action):
"""Ingests data (inserts, updates or deletions) into a Kafka topic."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {MzIsRunning, KafkaRunning, TopicExists}
def __init__(self, capabilities: Capabilities) -> None:
self.topic = random.choice(capabilities.get(TopicExists))
self.delta = random.randint(1, 10000)
# This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes
self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice(
string.ascii_letters
)
super().__init__(capabilities)
def __str__(self) -> str:
return f"{Action.__str__(self)} {self.topic.name}"
class KafkaInsert(Ingest):
"""Inserts data into a Kafka topic."""
def parallel(self) -> bool:
return False
def run(self, c: Composition) -> None:
prev_max = self.topic.watermarks.max
self.topic.watermarks.max = prev_max + self.delta
assert self.topic.watermarks.max >= 0
assert self.topic.watermarks.min >= 0
testdrive_str = SCHEMA + dedent(
f"""
$ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta}
{{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}}
"""
)
if self.parallel():
threading.Thread(target=c.testdrive, args=[testdrive_str]).start()
else:
c.testdrive(testdrive_str)
class KafkaInsertParallel(KafkaInsert):
"""Inserts data into a Kafka topic using background threads."""
@classmethod
def require_explicit_mention(cls) -> bool:
return True
def parallel(self) -> bool:
return True
class KafkaUpsertFromHead(Ingest):
"""Updates records from the head in-place by modifying their pad"""
def run(self, c: Composition) -> None:
if self.topic.envelope is Envelope.NONE:
return
head = self.topic.watermarks.max
start = max(head - self.delta, self.topic.watermarks.min)
actual_delta = head - start
if actual_delta > 0:
c.testdrive(
SCHEMA
+ dedent(
f"""
$ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta}
{{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
"""
)
)
class KafkaDeleteFromHead(Ingest):
"""Deletes the largest values previously inserted."""
def run(self, c: Composition) -> None:
if self.topic.envelope is Envelope.NONE:
return
prev_max = self.topic.watermarks.max
self.topic.watermarks.max = max(
prev_max - self.delta, self.topic.watermarks.min
)
assert self.topic.watermarks.max >= 0
assert self.topic.watermarks.min >= 0
actual_delta = prev_max - self.topic.watermarks.max
if actual_delta > 0:
c.testdrive(
SCHEMA
+ dedent(
f"""
$ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta}
{{"key": ${{kafka-ingest.iteration}}}}
"""
)
)
class KafkaUpsertFromTail(Ingest):
"""Updates records from the tail in-place by modifying their pad"""
def run(self, c: Composition) -> None:
if self.topic.envelope is Envelope.NONE:
return
tail = self.topic.watermarks.min
end = min(tail + self.delta, self.topic.watermarks.max)
actual_delta = end - tail
if actual_delta > 0:
c.testdrive(
SCHEMA
+ dedent(
f"""
$ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta}
{{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}}
"""
)
)
class KafkaDeleteFromTail(Ingest):
"""Deletes the smallest values previously inserted."""
def run(self, c: Composition) -> None:
if self.topic.envelope is Envelope.NONE:
return
prev_min = self.topic.watermarks.min
self.topic.watermarks.min = min(
prev_min + self.delta, self.topic.watermarks.max
)
assert self.topic.watermarks.max >= 0
assert self.topic.watermarks.min >= 0
actual_delta = self.topic.watermarks.min - prev_min
if actual_delta > 0:
c.testdrive(
SCHEMA
+ dedent(
f"""
$ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta}
{{"key": ${{kafka-ingest.iteration}}}}
"""
)
)
Classes
class CreateTopic (capabilities: Capabilities, topic: TopicExists)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class CreateTopic(Action): def __init__(self, capabilities: Capabilities, topic: TopicExists) -> None: self.topic = topic super().__init__(capabilities) def provides(self) -> list[Capability]: return [self.topic] def run(self, c: Composition) -> None: c.testdrive( SCHEMA + dedent( f""" $ kafka-create-topic topic={self.topic.name} partitions={self.topic.partitions} $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} repeat=1 {{"key": 0}} {{"f1": 0, "pad": ""}} """ ) )
Ancestors
Inherited members
class CreateTopicParameterized (max_topics: int = 10, envelopes_with_weights: dict[Envelope, int] = {<Envelope.NONE: 1>: 25, <Envelope.UPSERT: 2>: 75})
-
Creates a Kafka topic and decides on the envelope that will be used.
Expand source code Browse git
class CreateTopicParameterized(ActionFactory): """Creates a Kafka topic and decides on the envelope that will be used.""" @classmethod def requires(cls) -> set[type[Capability]]: return {MzIsRunning, KafkaRunning} def __init__( self, max_topics: int = 10, envelopes_with_weights: dict[Envelope, int] = { Envelope.NONE: 25, Envelope.UPSERT: 75, }, ) -> None: self.max_topics = max_topics self.envelopes_with_weights = envelopes_with_weights def new(self, capabilities: Capabilities) -> list[Action]: new_topic_name = capabilities.get_free_capability_name( TopicExists, self.max_topics ) if new_topic_name: return [ CreateTopic( capabilities=capabilities, topic=TopicExists( name=new_topic_name, envelope=random.choices( list(self.envelopes_with_weights.keys()), weights=list(self.envelopes_with_weights.values()), )[0], partitions=random.randint(1, 10), ), ) ] else: return []
Ancestors
Methods
def new(self, capabilities: Capabilities) ‑> list[Action]
-
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]: new_topic_name = capabilities.get_free_capability_name( TopicExists, self.max_topics ) if new_topic_name: return [ CreateTopic( capabilities=capabilities, topic=TopicExists( name=new_topic_name, envelope=random.choices( list(self.envelopes_with_weights.keys()), weights=list(self.envelopes_with_weights.values()), )[0], partitions=random.randint(1, 10), ), ) ] else: return []
Inherited members
class Ingest (capabilities: Capabilities)
-
Ingests data (inserts, updates or deletions) into a Kafka topic.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class Ingest(Action): """Ingests data (inserts, updates or deletions) into a Kafka topic.""" @classmethod def requires(cls) -> set[type[Capability]]: return {MzIsRunning, KafkaRunning, TopicExists} def __init__(self, capabilities: Capabilities) -> None: self.topic = random.choice(capabilities.get(TopicExists)) self.delta = random.randint(1, 10000) # This gives 67% pads of up to 10 bytes, 25% of up to 100 bytes and outliers up to 256 bytes self.pad = min(np.random.zipf(1.6, 1)[0], 256) * random.choice( string.ascii_letters ) super().__init__(capabilities) def __str__(self) -> str: return f"{Action.__str__(self)} {self.topic.name}"
Ancestors
Subclasses
Inherited members
class KafkaDeleteFromHead (capabilities: Capabilities)
-
Deletes the largest values previously inserted.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaDeleteFromHead(Ingest): """Deletes the largest values previously inserted.""" def run(self, c: Composition) -> None: if self.topic.envelope is Envelope.NONE: return prev_max = self.topic.watermarks.max self.topic.watermarks.max = max( prev_max - self.delta, self.topic.watermarks.min ) assert self.topic.watermarks.max >= 0 assert self.topic.watermarks.min >= 0 actual_delta = prev_max - self.topic.watermarks.max if actual_delta > 0: c.testdrive( SCHEMA + dedent( f""" $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.max + 1} repeat={actual_delta} {{"key": ${{kafka-ingest.iteration}}}} """ ) )
Ancestors
Inherited members
class KafkaDeleteFromTail (capabilities: Capabilities)
-
Deletes the smallest values previously inserted.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaDeleteFromTail(Ingest): """Deletes the smallest values previously inserted.""" def run(self, c: Composition) -> None: if self.topic.envelope is Envelope.NONE: return prev_min = self.topic.watermarks.min self.topic.watermarks.min = min( prev_min + self.delta, self.topic.watermarks.max ) assert self.topic.watermarks.max >= 0 assert self.topic.watermarks.min >= 0 actual_delta = self.topic.watermarks.min - prev_min if actual_delta > 0: c.testdrive( SCHEMA + dedent( f""" $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_min} repeat={actual_delta} {{"key": ${{kafka-ingest.iteration}}}} """ ) )
Ancestors
Inherited members
class KafkaInsert (capabilities: Capabilities)
-
Inserts data into a Kafka topic.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaInsert(Ingest): """Inserts data into a Kafka topic.""" def parallel(self) -> bool: return False def run(self, c: Composition) -> None: prev_max = self.topic.watermarks.max self.topic.watermarks.max = prev_max + self.delta assert self.topic.watermarks.max >= 0 assert self.topic.watermarks.min >= 0 testdrive_str = SCHEMA + dedent( f""" $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_max + 1} repeat={self.delta} {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad" : "{self.pad}"}} """ ) if self.parallel(): threading.Thread(target=c.testdrive, args=[testdrive_str]).start() else: c.testdrive(testdrive_str)
Ancestors
Subclasses
Methods
def parallel(self) ‑> bool
-
Expand source code Browse git
def parallel(self) -> bool: return False
Inherited members
class KafkaInsertParallel (capabilities: Capabilities)
-
Inserts data into a Kafka topic using background threads.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaInsertParallel(KafkaInsert): """Inserts data into a Kafka topic using background threads.""" @classmethod def require_explicit_mention(cls) -> bool: return True def parallel(self) -> bool: return True
Ancestors
Methods
def parallel(self) ‑> bool
-
Expand source code Browse git
def parallel(self) -> bool: return True
Inherited members
class KafkaStart (capabilities: Capabilities)
-
Start a Kafka instance.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaStart(Action): """Start a Kafka instance.""" def provides(self) -> list[Capability]: return [KafkaRunning()] def run(self, c: Composition) -> None: c.up("redpanda")
Ancestors
Inherited members
class KafkaStop (capabilities: Capabilities)
-
Stop the Kafka instance.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaStop(Action): """Stop the Kafka instance.""" @classmethod def requires(cls) -> set[type[Capability]]: return {KafkaRunning} def withholds(self) -> set[type[Capability]]: return {KafkaRunning} def run(self, c: Composition) -> None: c.kill("redpanda")
Ancestors
Inherited members
class KafkaUpsertFromHead (capabilities: Capabilities)
-
Updates records from the head in-place by modifying their pad
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaUpsertFromHead(Ingest): """Updates records from the head in-place by modifying their pad""" def run(self, c: Composition) -> None: if self.topic.envelope is Envelope.NONE: return head = self.topic.watermarks.max start = max(head - self.delta, self.topic.watermarks.min) actual_delta = head - start if actual_delta > 0: c.testdrive( SCHEMA + dedent( f""" $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={start} repeat={actual_delta} {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} """ ) )
Ancestors
Inherited members
class KafkaUpsertFromTail (capabilities: Capabilities)
-
Updates records from the tail in-place by modifying their pad
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaUpsertFromTail(Ingest): """Updates records from the tail in-place by modifying their pad""" def run(self, c: Composition) -> None: if self.topic.envelope is Envelope.NONE: return tail = self.topic.watermarks.min end = min(tail + self.delta, self.topic.watermarks.max) actual_delta = end - tail if actual_delta > 0: c.testdrive( SCHEMA + dedent( f""" $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={tail} repeat={actual_delta} {{"key": ${{kafka-ingest.iteration}}}} {{"f1": ${{kafka-ingest.iteration}}, "pad": "{self.pad}"}} """ ) )
Ancestors
Inherited members