Module materialize.zippy.pg_cdc_actions

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


import random
from textwrap import dedent

from materialize.mzcompose.composition import Composition
from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
from materialize.zippy.framework import Action, Capabilities, Capability
from materialize.zippy.mz_capabilities import MzIsRunning
from materialize.zippy.pg_cdc_capabilities import PostgresCdcTableExists
from materialize.zippy.postgres_capabilities import PostgresRunning, PostgresTableExists
from materialize.zippy.replica_capabilities import source_capable_clusters
from materialize.zippy.storaged_capabilities import StoragedRunning


class CreatePostgresCdcTable(Action):
    """Creates a Postgres CDC source in Materialized."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {
            BalancerdIsRunning,
            MzIsRunning,
            StoragedRunning,
            PostgresRunning,
            PostgresTableExists,
        }

    def __init__(self, capabilities: Capabilities) -> None:
        postgres_table = random.choice(capabilities.get(PostgresTableExists))
        postgres_pg_cdc_name = f"postgres_{postgres_table.name}"
        this_postgres_cdc_table = PostgresCdcTableExists(name=postgres_pg_cdc_name)
        cluster_name = random.choice(source_capable_clusters(capabilities))

        existing_postgres_cdc_tables = [
            s
            for s in capabilities.get(PostgresCdcTableExists)
            if s.name == this_postgres_cdc_table.name
        ]

        if len(existing_postgres_cdc_tables) == 0:
            self.new_postgres_cdc_table = True

            self.postgres_cdc_table = this_postgres_cdc_table
            self.postgres_cdc_table.postgres_table = postgres_table
            self.cluster_name = cluster_name
        elif len(existing_postgres_cdc_tables) == 1:
            self.new_postgres_cdc_table = False

            self.postgres_cdc_table = existing_postgres_cdc_tables[0]
        else:
            assert False

        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        if self.new_postgres_cdc_table:
            assert self.postgres_cdc_table is not None
            assert self.postgres_cdc_table.postgres_table is not None
            name = self.postgres_cdc_table.name
            c.testdrive(
                dedent(
                    f"""
                    $ postgres-execute connection=postgres://postgres:postgres@postgres

                    CREATE PUBLICATION {name}_publication FOR TABLE {self.postgres_cdc_table.postgres_table.name};


                    > CREATE SECRET {name}_password AS 'postgres';
                    > CREATE CONNECTION {name}_connection TO POSTGRES (
                        HOST postgres,
                        DATABASE postgres,
                        USER postgres,
                        PASSWORD SECRET {name}_password
                      );

                    > CREATE SOURCE {name}_source
                      IN CLUSTER {self.cluster_name}
                      FROM POSTGRES CONNECTION {name}_connection (PUBLICATION '{name}_publication')
                      FOR TABLES ({self.postgres_cdc_table.postgres_table.name} AS {name})
                    """
                )
            )

    def provides(self) -> list[Capability]:
        return [self.postgres_cdc_table] if self.new_postgres_cdc_table else []

Classes

class CreatePostgresCdcTable (capabilities: Capabilities)

Creates a Postgres CDC source in Materialized.

Construct a new action, possibly conditioning on the available capabilities.

Expand source code Browse git
class CreatePostgresCdcTable(Action):
    """Creates a Postgres CDC source in Materialized."""

    @classmethod
    def requires(cls) -> set[type[Capability]]:
        return {
            BalancerdIsRunning,
            MzIsRunning,
            StoragedRunning,
            PostgresRunning,
            PostgresTableExists,
        }

    def __init__(self, capabilities: Capabilities) -> None:
        postgres_table = random.choice(capabilities.get(PostgresTableExists))
        postgres_pg_cdc_name = f"postgres_{postgres_table.name}"
        this_postgres_cdc_table = PostgresCdcTableExists(name=postgres_pg_cdc_name)
        cluster_name = random.choice(source_capable_clusters(capabilities))

        existing_postgres_cdc_tables = [
            s
            for s in capabilities.get(PostgresCdcTableExists)
            if s.name == this_postgres_cdc_table.name
        ]

        if len(existing_postgres_cdc_tables) == 0:
            self.new_postgres_cdc_table = True

            self.postgres_cdc_table = this_postgres_cdc_table
            self.postgres_cdc_table.postgres_table = postgres_table
            self.cluster_name = cluster_name
        elif len(existing_postgres_cdc_tables) == 1:
            self.new_postgres_cdc_table = False

            self.postgres_cdc_table = existing_postgres_cdc_tables[0]
        else:
            assert False

        super().__init__(capabilities)

    def run(self, c: Composition) -> None:
        if self.new_postgres_cdc_table:
            assert self.postgres_cdc_table is not None
            assert self.postgres_cdc_table.postgres_table is not None
            name = self.postgres_cdc_table.name
            c.testdrive(
                dedent(
                    f"""
                    $ postgres-execute connection=postgres://postgres:postgres@postgres

                    CREATE PUBLICATION {name}_publication FOR TABLE {self.postgres_cdc_table.postgres_table.name};


                    > CREATE SECRET {name}_password AS 'postgres';
                    > CREATE CONNECTION {name}_connection TO POSTGRES (
                        HOST postgres,
                        DATABASE postgres,
                        USER postgres,
                        PASSWORD SECRET {name}_password
                      );

                    > CREATE SOURCE {name}_source
                      IN CLUSTER {self.cluster_name}
                      FROM POSTGRES CONNECTION {name}_connection (PUBLICATION '{name}_publication')
                      FOR TABLES ({self.postgres_cdc_table.postgres_table.name} AS {name})
                    """
                )
            )

    def provides(self) -> list[Capability]:
        return [self.postgres_cdc_table] if self.new_postgres_cdc_table else []

Ancestors

Inherited members