Module materialize.zippy.sink_actions
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from textwrap import dedent
from materialize.mzcompose.composition import Composition
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.framework import Action, ActionFactory, Capabilities, Capability
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.replica_capabilities import source_capable_clusters
from materialize.zippy.sink_capabilities import SinkExists
from materialize.zippy.storaged_capabilities import StoragedRunning
from materialize.zippy.view_capabilities import ViewExists
class CreateSinkParameterized(ActionFactory):
"""Creates a sink over an existing view. Then creates a source over that sink and a view over that source."""
@classmethod
def requires(cls) -> list[set[type[Capability]]]:
return [{BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists}]
def __init__(self, max_sinks: int = 10) -> None:
self.max_sinks = max_sinks
def new(self, capabilities: Capabilities) -> list[Action]:
new_sink_name = capabilities.get_free_capability_name(
SinkExists, self.max_sinks
)
if new_sink_name:
source_view = random.choice(capabilities.get(ViewExists))
cluster_name_out = random.choice(source_capable_clusters(capabilities))
cluster_name_in = random.choice(source_capable_clusters(capabilities))
dest_view = ViewExists(
name=f"{new_sink_name}_view",
inputs=[source_view],
expensive_aggregates=source_view.expensive_aggregates,
)
return [
CreateSink(
sink=SinkExists(
name=new_sink_name,
source_view=source_view,
dest_view=dest_view,
cluster_name_out=cluster_name_out,
cluster_name_in=cluster_name_in,
),
capabilities=capabilities,
),
]
else:
return []
class CreateSink(Action):
def __init__(
self,
sink: SinkExists,
capabilities: Capabilities,
) -> None:
assert (
sink is not None
), "CreateSink Action can not be referenced directly, it is produced by CreateSinkParameterized factory"
self.sink = sink
super().__init__(capabilities)
def run(self, c: Composition) -> None:
# The sink-derived source has upsert semantics, so produce a "normal" ViewExists output
# from the 'before' and the 'after'
refresh = random.choice(
["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"]
)
dest_view_sql = dedent(
f"""
> CREATE MATERIALIZED VIEW {self.sink.dest_view.name}
WITH (REFRESH {refresh}) AS
SELECT SUM(count_all)::int AS count_all, SUM(count_distinct)::int AS count_distinct, SUM(min_value)::int AS min_value, SUM(max_value)::int AS max_value FROM (
SELECT (after).count_all, (after).count_distinct, (after).min_value, (after).max_value FROM {self.sink.name}_source
UNION ALL
SELECT -(before).count_all, -(before).count_distinct, -(before).min_value, -(before).max_value FROM {self.sink.name}_source
);
"""
if self.sink.dest_view.expensive_aggregates
else f"""
> CREATE MATERIALIZED VIEW {self.sink.dest_view.name} AS
SELECT SUM(count_all)::int AS count_all FROM (
SELECT (after).count_all FROM {self.sink.name}_source
UNION ALL
SELECT -(before).count_all FROM {self.sink.name}_source
);
"""
)
c.testdrive(
dedent(
f"""
> CREATE CONNECTION IF NOT EXISTS {self.sink.name}_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', PROGRESS TOPIC 'zippy-{self.sink.name}-${{testdrive.seed}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CONNECTION IF NOT EXISTS {self.sink.name}_csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
> CREATE SINK {self.sink.name}
IN CLUSTER {self.sink.cluster_name_out}
FROM {self.sink.source_view.name}
INTO KAFKA CONNECTION {self.sink.name}_kafka_conn (TOPIC 'sink-{self.sink.name}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.sink.name}_csr_conn
ENVELOPE DEBEZIUM;
$ kafka-verify-topic sink=materialize.public.{self.sink.name} await-value-schema=true
# Ingest the sink again in order to be able to validate its contents
> CREATE SOURCE {self.sink.name}_source
IN CLUSTER {self.sink.cluster_name_in}
FROM KAFKA CONNECTION {self.sink.name}_kafka_conn (TOPIC 'sink-{self.sink.name}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.sink.name}_csr_conn
ENVELOPE NONE
"""
)
+ dest_view_sql
)
def provides(self) -> list[Capability]:
return [self.sink, self.sink.dest_view]
Classes
class CreateSink (sink: SinkExists, capabilities: Capabilities)
-
Base class for an action that a Zippy test can take.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class CreateSink(Action): def __init__( self, sink: SinkExists, capabilities: Capabilities, ) -> None: assert ( sink is not None ), "CreateSink Action can not be referenced directly, it is produced by CreateSinkParameterized factory" self.sink = sink super().__init__(capabilities) def run(self, c: Composition) -> None: # The sink-derived source has upsert semantics, so produce a "normal" ViewExists output # from the 'before' and the 'after' refresh = random.choice( ["ON COMMIT", f"EVERY '{random.randint(1, 5)} seconds'"] ) dest_view_sql = dedent( f""" > CREATE MATERIALIZED VIEW {self.sink.dest_view.name} WITH (REFRESH {refresh}) AS SELECT SUM(count_all)::int AS count_all, SUM(count_distinct)::int AS count_distinct, SUM(min_value)::int AS min_value, SUM(max_value)::int AS max_value FROM ( SELECT (after).count_all, (after).count_distinct, (after).min_value, (after).max_value FROM {self.sink.name}_source UNION ALL SELECT -(before).count_all, -(before).count_distinct, -(before).min_value, -(before).max_value FROM {self.sink.name}_source ); """ if self.sink.dest_view.expensive_aggregates else f""" > CREATE MATERIALIZED VIEW {self.sink.dest_view.name} AS SELECT SUM(count_all)::int AS count_all FROM ( SELECT (after).count_all FROM {self.sink.name}_source UNION ALL SELECT -(before).count_all FROM {self.sink.name}_source ); """ ) c.testdrive( dedent( f""" > CREATE CONNECTION IF NOT EXISTS {self.sink.name}_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', PROGRESS TOPIC 'zippy-{self.sink.name}-${{testdrive.seed}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS {self.sink.name}_csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}'); > CREATE SINK {self.sink.name} IN CLUSTER {self.sink.cluster_name_out} FROM {self.sink.source_view.name} INTO KAFKA CONNECTION {self.sink.name}_kafka_conn (TOPIC 'sink-{self.sink.name}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.sink.name}_csr_conn ENVELOPE DEBEZIUM; $ kafka-verify-topic sink=materialize.public.{self.sink.name} await-value-schema=true # Ingest the sink again in order to be able to validate its contents > CREATE SOURCE {self.sink.name}_source IN CLUSTER {self.sink.cluster_name_in} FROM KAFKA CONNECTION {self.sink.name}_kafka_conn (TOPIC 'sink-{self.sink.name}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {self.sink.name}_csr_conn ENVELOPE NONE """ ) + dest_view_sql ) def provides(self) -> list[Capability]: return [self.sink, self.sink.dest_view]
Ancestors
Inherited members
class CreateSinkParameterized (max_sinks: int = 10)
-
Creates a sink over an existing view. Then creates a source over that sink and a view over that source.
Expand source code Browse git
class CreateSinkParameterized(ActionFactory): """Creates a sink over an existing view. Then creates a source over that sink and a view over that source.""" @classmethod def requires(cls) -> list[set[type[Capability]]]: return [{BalancerdIsRunning, MzIsRunning, StoragedRunning, ViewExists}] def __init__(self, max_sinks: int = 10) -> None: self.max_sinks = max_sinks def new(self, capabilities: Capabilities) -> list[Action]: new_sink_name = capabilities.get_free_capability_name( SinkExists, self.max_sinks ) if new_sink_name: source_view = random.choice(capabilities.get(ViewExists)) cluster_name_out = random.choice(source_capable_clusters(capabilities)) cluster_name_in = random.choice(source_capable_clusters(capabilities)) dest_view = ViewExists( name=f"{new_sink_name}_view", inputs=[source_view], expensive_aggregates=source_view.expensive_aggregates, ) return [ CreateSink( sink=SinkExists( name=new_sink_name, source_view=source_view, dest_view=dest_view, cluster_name_out=cluster_name_out, cluster_name_in=cluster_name_in, ), capabilities=capabilities, ), ] else: return []
Ancestors
Methods
def new(self, capabilities: Capabilities) ‑> list[Action]
-
Expand source code Browse git
def new(self, capabilities: Capabilities) -> list[Action]: new_sink_name = capabilities.get_free_capability_name( SinkExists, self.max_sinks ) if new_sink_name: source_view = random.choice(capabilities.get(ViewExists)) cluster_name_out = random.choice(source_capable_clusters(capabilities)) cluster_name_in = random.choice(source_capable_clusters(capabilities)) dest_view = ViewExists( name=f"{new_sink_name}_view", inputs=[source_view], expensive_aggregates=source_view.expensive_aggregates, ) return [ CreateSink( sink=SinkExists( name=new_sink_name, source_view=source_view, dest_view=dest_view, cluster_name_out=cluster_name_out, cluster_name_in=cluster_name_in, ), capabilities=capabilities, ), ] else: return []
Inherited members