Module materialize.zippy.debezium_actions
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from textwrap import dedent
from materialize.mzcompose.composition import Composition
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.debezium_capabilities import (
DebeziumRunning,
DebeziumSourceExists,
PostgresTableExists,
)
from materialize.zippy.framework import Action, Capabilities, Capability
from materialize.zippy.kafka_capabilities import KafkaRunning
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.replica_capabilities import source_capable_clusters
from materialize.zippy.storaged_capabilities import StoragedRunning
class DebeziumStart(Action):
"""Start a Debezium instance."""
def provides(self) -> list[Capability]:
return [DebeziumRunning()]
def run(self, c: Composition) -> None:
c.up("debezium")
class DebeziumStop(Action):
"""Stop the Debezium instance."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {DebeziumRunning}
def withholds(self) -> set[type[Capability]]:
return {DebeziumRunning}
def run(self, c: Composition) -> None:
c.kill("debezium")
class CreateDebeziumSource(Action):
"""Creates a Debezium source in Materialized."""
@classmethod
def requires(cls) -> set[type[Capability]]:
return {
BalancerdIsRunning,
MzIsRunning,
StoragedRunning,
KafkaRunning,
PostgresTableExists,
}
def __init__(self, capabilities: Capabilities) -> None:
# To avoid conflicts, we make sure the postgres table and the debezium source have matching names
postgres_table = random.choice(capabilities.get(PostgresTableExists))
cluster_name = random.choice(source_capable_clusters(capabilities))
debezium_source_name = f"debezium_source_{postgres_table.name}"
this_debezium_source = DebeziumSourceExists(name=debezium_source_name)
existing_debezium_sources = [
s
for s in capabilities.get(DebeziumSourceExists)
if s.name == this_debezium_source.name
]
if len(existing_debezium_sources) == 0:
self.new_debezium_source = True
self.debezium_source = this_debezium_source
self.postgres_table = postgres_table
self.debezium_source.postgres_table = self.postgres_table
self.cluster_name = cluster_name
elif len(existing_debezium_sources) == 1:
self.new_debezium_source = False
self.debezium_source = existing_debezium_sources[0]
assert self.debezium_source.postgres_table is not None
self.postgres_table = self.debezium_source.postgres_table
else:
assert False
super().__init__(capabilities)
def run(self, c: Composition) -> None:
if self.new_debezium_source:
c.testdrive(
dedent(
f"""
$ http-request method=POST url=http://debezium:8083/connectors content-type=application/json
{{
"name": "{self.debezium_source.name}",
"config": {{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "postgres",
"schema.include.list": "public",
"table.include.list": "public.{self.postgres_table.name}",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication_{self.debezium_source.name}",
"publication.autocreate.mode": "filtered",
"slot.name" : "slot_{self.postgres_table.name}",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.history",
"truncate.handling.mode": "include",
"decimal.handling.mode": "precise",
"topic.prefix": "postgres"
}}
}}
$ schema-registry-wait topic=postgres.public.{self.postgres_table.name}
> CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
> CREATE SOURCE {self.debezium_source.name}
IN CLUSTER {self.cluster_name}
FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.{self.postgres_table.name}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM
"""
)
)
def provides(self) -> list[Capability]:
return [self.debezium_source] if self.new_debezium_source else []
Classes
class CreateDebeziumSource (capabilities: Capabilities)
-
Creates a Debezium source in Materialized.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class CreateDebeziumSource(Action): """Creates a Debezium source in Materialized.""" @classmethod def requires(cls) -> set[type[Capability]]: return { BalancerdIsRunning, MzIsRunning, StoragedRunning, KafkaRunning, PostgresTableExists, } def __init__(self, capabilities: Capabilities) -> None: # To avoid conflicts, we make sure the postgres table and the debezium source have matching names postgres_table = random.choice(capabilities.get(PostgresTableExists)) cluster_name = random.choice(source_capable_clusters(capabilities)) debezium_source_name = f"debezium_source_{postgres_table.name}" this_debezium_source = DebeziumSourceExists(name=debezium_source_name) existing_debezium_sources = [ s for s in capabilities.get(DebeziumSourceExists) if s.name == this_debezium_source.name ] if len(existing_debezium_sources) == 0: self.new_debezium_source = True self.debezium_source = this_debezium_source self.postgres_table = postgres_table self.debezium_source.postgres_table = self.postgres_table self.cluster_name = cluster_name elif len(existing_debezium_sources) == 1: self.new_debezium_source = False self.debezium_source = existing_debezium_sources[0] assert self.debezium_source.postgres_table is not None self.postgres_table = self.debezium_source.postgres_table else: assert False super().__init__(capabilities) def run(self, c: Composition) -> None: if self.new_debezium_source: c.testdrive( dedent( f""" $ http-request method=POST url=http://debezium:8083/connectors content-type=application/json {{ "name": "{self.debezium_source.name}", "config": {{ "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "postgres", "schema.include.list": "public", "table.include.list": "public.{self.postgres_table.name}", "plugin.name": "pgoutput", "publication.name": "dbz_publication_{self.debezium_source.name}", "publication.autocreate.mode": "filtered", "slot.name" : "slot_{self.postgres_table.name}", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.history", "truncate.handling.mode": "include", "decimal.handling.mode": "precise", "topic.prefix": "postgres" }} }} $ schema-registry-wait topic=postgres.public.{self.postgres_table.name} > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}'); > CREATE SOURCE {self.debezium_source.name} IN CLUSTER {self.cluster_name} FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.{self.postgres_table.name}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE DEBEZIUM """ ) ) def provides(self) -> list[Capability]: return [self.debezium_source] if self.new_debezium_source else []
Ancestors
Inherited members
class DebeziumStart (capabilities: Capabilities)
-
Start a Debezium instance.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class DebeziumStart(Action): """Start a Debezium instance.""" def provides(self) -> list[Capability]: return [DebeziumRunning()] def run(self, c: Composition) -> None: c.up("debezium")
Ancestors
Inherited members
class DebeziumStop (capabilities: Capabilities)
-
Stop the Debezium instance.
Construct a new action, possibly conditioning on the available capabilities.
Expand source code Browse git
class DebeziumStop(Action): """Stop the Debezium instance.""" @classmethod def requires(cls) -> set[type[Capability]]: return {DebeziumRunning} def withholds(self) -> set[type[Capability]]: return {DebeziumRunning} def run(self, c: Composition) -> None: c.kill("debezium")
Ancestors
Inherited members