Module materialize.zippy.kafka_actions
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from typing import List, Set, Type
from materialize.mzcompose import Composition
from materialize.zippy.framework import Action, Capabilities, Capability
from materialize.zippy.kafka_capabilities import KafkaRunning, TopicExists
from materialize.zippy.mz_capabilities import MzIsRunning
SCHEMA = """
$ set keyschema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"long"}
]
}
$ set schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f2", "type":"long"}
]
}
"""
class KafkaStart(Action):
def provides(self) -> List[Capability]:
return [KafkaRunning()]
def run(self, c: Composition) -> None:
c.start_and_wait_for_tcp(services=["kafka"])
class KafkaStop(Action):
@classmethod
def requires(self) -> Set[Type[Capability]]:
return {KafkaRunning}
def removes(self) -> Set[Type[Capability]]:
return {KafkaRunning}
def run(self, c: Composition) -> None:
c.kill("kafka")
class CreateTopic(Action):
@classmethod
def requires(cls) -> Set[Type[Capability]]:
return {MzIsRunning, KafkaRunning}
def __init__(self, capabilities: Capabilities) -> None:
this_topic = TopicExists(name="topic" + str(random.randint(1, 10)))
existing_topics = [
t for t in capabilities.get(TopicExists) if t.name == this_topic.name
]
if len(existing_topics) == 0:
self.new_topic = True
self.topic = this_topic
elif len(existing_topics) == 1:
self.new_topic = False
self.topic = existing_topics[0]
else:
assert False
def provides(self) -> List[Capability]:
return [self.topic] if self.new_topic else []
def run(self, c: Composition) -> None:
if self.new_topic:
c.testdrive(
f"""
$ kafka-create-topic topic={self.topic.name}
{SCHEMA}
$ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} publish=true repeat=1
{{"f1": 0}} {{"f2": 0}}
"""
)
class Ingest(Action):
@classmethod
def requires(cls) -> Set[Type[Capability]]:
return {MzIsRunning, KafkaRunning, TopicExists}
def __init__(self, capabilities: Capabilities) -> None:
self.topic = random.choice(capabilities.get(TopicExists))
self.delta = random.randint(1, 100000)
class KafkaInsert(Ingest):
def run(self, c: Composition) -> None:
prev_high = self.topic.watermarks.high
self.topic.watermarks.high = prev_high + self.delta
assert self.topic.watermarks.high >= 0
assert self.topic.watermarks.low >= 0
c.testdrive(
f"""
{SCHEMA}
$ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_high + 1} publish=true repeat={self.delta}
{{"f1": ${{kafka-ingest.iteration}}}} {{"f2": ${{kafka-ingest.iteration}}}}
"""
)
class KafkaDeleteFromHead(Ingest):
def run(self, c: Composition) -> None:
prev_high = self.topic.watermarks.high
self.topic.watermarks.high = max(
prev_high - self.delta, self.topic.watermarks.low
)
assert self.topic.watermarks.high >= 0
assert self.topic.watermarks.low >= 0
actual_delta = prev_high - self.topic.watermarks.high
if actual_delta > 0:
c.testdrive(
f"""
{SCHEMA}
$ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.high + 1} publish=true repeat={actual_delta}
{{"f1": ${{kafka-ingest.iteration}}}}
"""
)
class KafkaDeleteFromTail(Ingest):
def run(self, c: Composition) -> None:
prev_low = self.topic.watermarks.low
self.topic.watermarks.low = min(
prev_low + self.delta, self.topic.watermarks.high
)
assert self.topic.watermarks.high >= 0
assert self.topic.watermarks.low >= 0
actual_delta = self.topic.watermarks.low - prev_low
if actual_delta > 0:
c.testdrive(
f"""
{SCHEMA}
$ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_low} publish=true repeat={actual_delta}
{{"f1": ${{kafka-ingest.iteration}}}}
"""
)
Classes
class CreateTopic (capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class CreateTopic(Action): @classmethod def requires(cls) -> Set[Type[Capability]]: return {MzIsRunning, KafkaRunning} def __init__(self, capabilities: Capabilities) -> None: this_topic = TopicExists(name="topic" + str(random.randint(1, 10))) existing_topics = [ t for t in capabilities.get(TopicExists) if t.name == this_topic.name ] if len(existing_topics) == 0: self.new_topic = True self.topic = this_topic elif len(existing_topics) == 1: self.new_topic = False self.topic = existing_topics[0] else: assert False def provides(self) -> List[Capability]: return [self.topic] if self.new_topic else [] def run(self, c: Composition) -> None: if self.new_topic: c.testdrive( f""" $ kafka-create-topic topic={self.topic.name} {SCHEMA} $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} publish=true repeat=1 {{"f1": 0}} {{"f2": 0}} """ )
Ancestors
Inherited members
class Ingest (capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class Ingest(Action): @classmethod def requires(cls) -> Set[Type[Capability]]: return {MzIsRunning, KafkaRunning, TopicExists} def __init__(self, capabilities: Capabilities) -> None: self.topic = random.choice(capabilities.get(TopicExists)) self.delta = random.randint(1, 100000)
Ancestors
Subclasses
Inherited members
class KafkaDeleteFromHead (capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaDeleteFromHead(Ingest): def run(self, c: Composition) -> None: prev_high = self.topic.watermarks.high self.topic.watermarks.high = max( prev_high - self.delta, self.topic.watermarks.low ) assert self.topic.watermarks.high >= 0 assert self.topic.watermarks.low >= 0 actual_delta = prev_high - self.topic.watermarks.high if actual_delta > 0: c.testdrive( f""" {SCHEMA} $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={self.topic.watermarks.high + 1} publish=true repeat={actual_delta} {{"f1": ${{kafka-ingest.iteration}}}} """ )
Ancestors
Inherited members
class KafkaDeleteFromTail (capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaDeleteFromTail(Ingest): def run(self, c: Composition) -> None: prev_low = self.topic.watermarks.low self.topic.watermarks.low = min( prev_low + self.delta, self.topic.watermarks.high ) assert self.topic.watermarks.high >= 0 assert self.topic.watermarks.low >= 0 actual_delta = self.topic.watermarks.low - prev_low if actual_delta > 0: c.testdrive( f""" {SCHEMA} $ kafka-ingest format=avro topic={self.topic.name} key-format=avro key-schema=${{keyschema}} schema=${{schema}} start-iteration={prev_low} publish=true repeat={actual_delta} {{"f1": ${{kafka-ingest.iteration}}}} """ )
Ancestors
Inherited members
class KafkaInsert (capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaInsert(Ingest): def run(self, c: Composition) -> None: prev_high = self.topic.watermarks.high self.topic.watermarks.high = prev_high + self.delta assert self.topic.watermarks.high >= 0 assert self.topic.watermarks.low >= 0 c.testdrive( f""" {SCHEMA} $ kafka-ingest format=avro key-format=avro topic={self.topic.name} schema=${{schema}} key-schema=${{keyschema}} start-iteration={prev_high + 1} publish=true repeat={self.delta} {{"f1": ${{kafka-ingest.iteration}}}} {{"f2": ${{kafka-ingest.iteration}}}} """ )
Ancestors
Inherited members
class KafkaStart (capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaStart(Action): def provides(self) -> List[Capability]: return [KafkaRunning()] def run(self, c: Composition) -> None: c.start_and_wait_for_tcp(services=["kafka"])
Ancestors
Inherited members
class KafkaStop (capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class KafkaStop(Action): @classmethod def requires(self) -> Set[Type[Capability]]: return {KafkaRunning} def removes(self) -> Set[Type[Capability]]: return {KafkaRunning} def run(self, c: Composition) -> None: c.kill("kafka")
Ancestors
Inherited members