Module materialize.zippy.source_actions
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from typing import List, Set, Type
from materialize.mzcompose import Composition
from materialize.zippy.framework import Action, Capabilities, Capability
from materialize.zippy.kafka_capabilities import KafkaRunning, TopicExists
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.source_capabilities import SourceExists
class CreateSource(Action):
@classmethod
def requires(self) -> Set[Type[Capability]]:
return {MzIsRunning, KafkaRunning, TopicExists}
def __init__(self, capabilities: Capabilities) -> None:
source_name = "source" + str(random.randint(1, 10))
this_source = SourceExists(name=source_name)
existing_sources = [
s for s in capabilities.get(SourceExists) if s.name == this_source.name
]
if len(existing_sources) == 0:
self.new_source = True
self.source = this_source
self.topic = random.choice(capabilities.get(TopicExists))
self.source.topic = self.topic
elif len(existing_sources) == 1:
self.new_source = False
self.source = existing_sources[0]
assert self.source.topic is not None
self.topic = self.source.topic
else:
assert False
def run(self, c: Composition) -> None:
if self.new_source:
c.testdrive(
f"""
> CREATE MATERIALIZED SOURCE {self.source.name}
FROM KAFKA BROKER '${{testdrive.kafka-addr}}'
TOPIC 'testdrive-{self.topic.name}-${{testdrive.seed}}'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${{testdrive.schema-registry-url}}'
ENVELOPE UPSERT
"""
)
def provides(self) -> List[Capability]:
return [self.source] if self.new_source else []
Classes
class CreateSource (capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class CreateSource(Action): @classmethod def requires(self) -> Set[Type[Capability]]: return {MzIsRunning, KafkaRunning, TopicExists} def __init__(self, capabilities: Capabilities) -> None: source_name = "source" + str(random.randint(1, 10)) this_source = SourceExists(name=source_name) existing_sources = [ s for s in capabilities.get(SourceExists) if s.name == this_source.name ] if len(existing_sources) == 0: self.new_source = True self.source = this_source self.topic = random.choice(capabilities.get(TopicExists)) self.source.topic = self.topic elif len(existing_sources) == 1: self.new_source = False self.source = existing_sources[0] assert self.source.topic is not None self.topic = self.source.topic else: assert False def run(self, c: Composition) -> None: if self.new_source: c.testdrive( f""" > CREATE MATERIALIZED SOURCE {self.source.name} FROM KAFKA BROKER '${{testdrive.kafka-addr}}' TOPIC 'testdrive-{self.topic.name}-${{testdrive.seed}}' FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${{testdrive.schema-registry-url}}' ENVELOPE UPSERT """ ) def provides(self) -> List[Capability]: return [self.source] if self.new_source else []
Ancestors
Inherited members