Module materialize.checks.all_checks.alter_connection

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from __future__ import annotations

from enum import Enum
from random import Random
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion


def schema() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


class SshChange(Enum):
    ADD_SSH = 1
    DROP_SSH = 2
    CHANGE_SSH_HOST = 3


# {i} will be replaced later
SHOW_CONNECTION_SSH_TUNNEL = """materialize.public.ssh_tunnel_{i} "CREATE CONNECTION \\"materialize\\".\\"public\\".\\"ssh_tunnel_{i}\\" TO SSH TUNNEL (HOST = 'ssh-bastion-host', PORT = 22, USER = 'mz')" """
WITH_SSH_SUFFIX = "USING SSH TUNNEL ssh_tunnel_{i}"


class AlterConnectionSshChangeBase(Check):
    def __init__(
        self,
        ssh_change: SshChange,
        index: int,
        base_version: MzVersion,
        rng: Random | None,
    ):
        super().__init__(base_version, rng)
        self.ssh_change = ssh_change
        self.index = index

    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.79.0-dev")

    def initialize(self) -> Testdrive:
        i = self.index

        return Testdrive(
            schema()
            + dedent(
                f"""
                $[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
                ALTER SYSTEM SET enable_table_keys = true;

                $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
                ALTER SYSTEM SET enable_connection_validation_syntax = true

                $ kafka-create-topic topic=alter-connection-{i}a

                $ kafka-ingest topic=alter-connection-{i}a format=bytes
                one

                > CREATE CONNECTION kafka_conn_alter_connection_{i}a
                  TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});

                > CREATE SOURCE alter_connection_source_{i}a
                  FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-{i}a-${{testdrive.seed}}')
                  FORMAT TEXT
                  ENVELOPE NONE;

                > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
                {'false' if self.ssh_change == SshChange.ADD_SSH else 'true'}

                > SELECT count(regexp_match(create_sql, 'ssh-bastion-host')) > 0 FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
                true

                > CREATE TABLE alter_connection_table_{i} (f1 INTEGER, PRIMARY KEY (f1));
                > INSERT INTO alter_connection_table_{i} VALUES (1);

                > CREATE MATERIALIZED VIEW mv_alter_connection_{i} AS SELECT f1 FROM alter_connection_table_{i};

                > CREATE SINK alter_connection_sink_{i} FROM mv_alter_connection_{i}
                  INTO KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM;

                $ kafka-verify-topic sink=materialize.public.alter_connection_sink_{i} await-value-schema=true
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        i = self.index

        return [
            Testdrive(schema() + dedent(s))
            for s in [
                f"""
                $ kafka-ingest topic=alter-connection-{i}a format=bytes
                two

                > ALTER CONNECTION kafka_conn_alter_connection_{i}a SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});

                $ kafka-ingest topic=alter-connection-{i}a format=bytes
                three

                $ kafka-create-topic topic=alter-connection-{i}b

                $ kafka-ingest topic=alter-connection-{i}b format=bytes
                ten

                > CREATE CONNECTION kafka_conn_alter_connection_{i}b
                  TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});

                > CREATE SOURCE alter_connection_source_{i}b
                  FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}b (TOPIC 'testdrive-alter-connection-{i}b-${{testdrive.seed}}')
                  FORMAT TEXT
                  ENVELOPE NONE;

                $ kafka-ingest topic=alter-connection-{i}b format=bytes
                twenty

                {f"> ALTER CONNECTION ssh_tunnel_{i} SET (HOST = 'other_ssh_bastion') WITH (VALIDATE = true);" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "$ nop"}

                > INSERT INTO alter_connection_table_{i} VALUES (2);
                """,
                f"""
                $ kafka-ingest topic=alter-connection-{i}a format=bytes
                four

                $ kafka-ingest topic=alter-connection-{i}b format=bytes
                thirty

                > ALTER CONNECTION kafka_conn_alter_connection_{i}b SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});

                $ kafka-ingest topic=alter-connection-{i}b format=bytes
                fourty

                > INSERT INTO alter_connection_table_{i} VALUES (3);
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        i = self.index

        return Testdrive(
            dedent(
                f"""
                > SELECT regexp_match(create_sql, '(ssh-bastion-host|other_ssh_bastion)') FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
                {"{other_ssh_bastion}" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "{ssh-bastion-host}"}

                > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
                {'true' if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else 'false'}

                > SELECT * FROM alter_connection_source_{i}a;
                one
                two
                three
                four

                > SELECT * FROM alter_connection_source_{i}b;
                ten
                twenty
                thirty
                fourty

                > DROP SOURCE IF EXISTS alter_connection_sink_source_{i}

                > CREATE SOURCE alter_connection_sink_source_{i}
                  FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE;

                # Table has expected data
                > SELECT * FROM alter_connection_table_{i};
                1
                2
                3

                # TODO: Reenable this check when kafka-verify-data can deal with validate being run twice
                # Sink Kafka topic has expected data
                # $ kafka-verify-data format=avro sink=materialize.public.alter_connection_sink_{i} sort-messages=true
                # {{"before": null, "after": {{"row":{{"f1": 1}}}}}}
                # {{"before": null, "after": {{"row":{{"f1": 2}}}}}}
                # {{"before": null, "after": {{"row":{{"f1": 3}}}}}}

                # Source based on sink topic has ingested data; data must be text-formatted because records are not
                # supported in testdrive
                > SELECT before::text, after::text FROM alter_connection_sink_source_{i};
                <null> (1)
                <null> (2)
                <null> (3)
                """
            )
        )


@externally_idempotent(False)
class AlterConnectionToSsh(AlterConnectionSshChangeBase):
    def __init__(self, base_version: MzVersion, rng: Random | None):
        super().__init__(SshChange.ADD_SSH, 1, base_version, rng)


@externally_idempotent(False)
class AlterConnectionToNonSsh(AlterConnectionSshChangeBase):
    def __init__(self, base_version: MzVersion, rng: Random | None):
        super().__init__(SshChange.DROP_SSH, 2, base_version, rng)


@externally_idempotent(False)
class AlterConnectionHost(AlterConnectionSshChangeBase):
    def __init__(self, base_version: MzVersion, rng: Random | None):
        super().__init__(SshChange.CHANGE_SSH_HOST, 3, base_version, rng)

Functions

def schema() ‑> str
Expand source code Browse git
def schema() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)

Classes

class AlterConnectionHost (base_version: MzVersion, rng: Random | None)
Expand source code Browse git
@externally_idempotent(False)
class AlterConnectionHost(AlterConnectionSshChangeBase):
    def __init__(self, base_version: MzVersion, rng: Random | None):
        super().__init__(SshChange.CHANGE_SSH_HOST, 3, base_version, rng)

Ancestors

Class variables

var externally_idempotent : bool
class AlterConnectionSshChangeBase (ssh_change: SshChange, index: int, base_version: MzVersion, rng: Random | None)
Expand source code Browse git
class AlterConnectionSshChangeBase(Check):
    def __init__(
        self,
        ssh_change: SshChange,
        index: int,
        base_version: MzVersion,
        rng: Random | None,
    ):
        super().__init__(base_version, rng)
        self.ssh_change = ssh_change
        self.index = index

    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.79.0-dev")

    def initialize(self) -> Testdrive:
        i = self.index

        return Testdrive(
            schema()
            + dedent(
                f"""
                $[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
                ALTER SYSTEM SET enable_table_keys = true;

                $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
                ALTER SYSTEM SET enable_connection_validation_syntax = true

                $ kafka-create-topic topic=alter-connection-{i}a

                $ kafka-ingest topic=alter-connection-{i}a format=bytes
                one

                > CREATE CONNECTION kafka_conn_alter_connection_{i}a
                  TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});

                > CREATE SOURCE alter_connection_source_{i}a
                  FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-{i}a-${{testdrive.seed}}')
                  FORMAT TEXT
                  ENVELOPE NONE;

                > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
                {'false' if self.ssh_change == SshChange.ADD_SSH else 'true'}

                > SELECT count(regexp_match(create_sql, 'ssh-bastion-host')) > 0 FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
                true

                > CREATE TABLE alter_connection_table_{i} (f1 INTEGER, PRIMARY KEY (f1));
                > INSERT INTO alter_connection_table_{i} VALUES (1);

                > CREATE MATERIALIZED VIEW mv_alter_connection_{i} AS SELECT f1 FROM alter_connection_table_{i};

                > CREATE SINK alter_connection_sink_{i} FROM mv_alter_connection_{i}
                  INTO KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM;

                $ kafka-verify-topic sink=materialize.public.alter_connection_sink_{i} await-value-schema=true
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        i = self.index

        return [
            Testdrive(schema() + dedent(s))
            for s in [
                f"""
                $ kafka-ingest topic=alter-connection-{i}a format=bytes
                two

                > ALTER CONNECTION kafka_conn_alter_connection_{i}a SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});

                $ kafka-ingest topic=alter-connection-{i}a format=bytes
                three

                $ kafka-create-topic topic=alter-connection-{i}b

                $ kafka-ingest topic=alter-connection-{i}b format=bytes
                ten

                > CREATE CONNECTION kafka_conn_alter_connection_{i}b
                  TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});

                > CREATE SOURCE alter_connection_source_{i}b
                  FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}b (TOPIC 'testdrive-alter-connection-{i}b-${{testdrive.seed}}')
                  FORMAT TEXT
                  ENVELOPE NONE;

                $ kafka-ingest topic=alter-connection-{i}b format=bytes
                twenty

                {f"> ALTER CONNECTION ssh_tunnel_{i} SET (HOST = 'other_ssh_bastion') WITH (VALIDATE = true);" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "$ nop"}

                > INSERT INTO alter_connection_table_{i} VALUES (2);
                """,
                f"""
                $ kafka-ingest topic=alter-connection-{i}a format=bytes
                four

                $ kafka-ingest topic=alter-connection-{i}b format=bytes
                thirty

                > ALTER CONNECTION kafka_conn_alter_connection_{i}b SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});

                $ kafka-ingest topic=alter-connection-{i}b format=bytes
                fourty

                > INSERT INTO alter_connection_table_{i} VALUES (3);
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        i = self.index

        return Testdrive(
            dedent(
                f"""
                > SELECT regexp_match(create_sql, '(ssh-bastion-host|other_ssh_bastion)') FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
                {"{other_ssh_bastion}" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "{ssh-bastion-host}"}

                > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
                {'true' if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else 'false'}

                > SELECT * FROM alter_connection_source_{i}a;
                one
                two
                three
                four

                > SELECT * FROM alter_connection_source_{i}b;
                ten
                twenty
                thirty
                fourty

                > DROP SOURCE IF EXISTS alter_connection_sink_source_{i}

                > CREATE SOURCE alter_connection_sink_source_{i}
                  FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE;

                # Table has expected data
                > SELECT * FROM alter_connection_table_{i};
                1
                2
                3

                # TODO: Reenable this check when kafka-verify-data can deal with validate being run twice
                # Sink Kafka topic has expected data
                # $ kafka-verify-data format=avro sink=materialize.public.alter_connection_sink_{i} sort-messages=true
                # {{"before": null, "after": {{"row":{{"f1": 1}}}}}}
                # {{"before": null, "after": {{"row":{{"f1": 2}}}}}}
                # {{"before": null, "after": {{"row":{{"f1": 3}}}}}}

                # Source based on sink topic has ingested data; data must be text-formatted because records are not
                # supported in testdrive
                > SELECT before::text, after::text FROM alter_connection_sink_source_{i};
                <null> (1)
                <null> (2)
                <null> (3)
                """
            )
        )

Ancestors

Subclasses

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    i = self.index

    return Testdrive(
        schema()
        + dedent(
            f"""
            $[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
            ALTER SYSTEM SET enable_table_keys = true;

            $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
            ALTER SYSTEM SET enable_connection_validation_syntax = true

            $ kafka-create-topic topic=alter-connection-{i}a

            $ kafka-ingest topic=alter-connection-{i}a format=bytes
            one

            > CREATE CONNECTION kafka_conn_alter_connection_{i}a
              TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});

            > CREATE SOURCE alter_connection_source_{i}a
              FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-{i}a-${{testdrive.seed}}')
              FORMAT TEXT
              ENVELOPE NONE;

            > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
            {'false' if self.ssh_change == SshChange.ADD_SSH else 'true'}

            > SELECT count(regexp_match(create_sql, 'ssh-bastion-host')) > 0 FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
            true

            > CREATE TABLE alter_connection_table_{i} (f1 INTEGER, PRIMARY KEY (f1));
            > INSERT INTO alter_connection_table_{i} VALUES (1);

            > CREATE MATERIALIZED VIEW mv_alter_connection_{i} AS SELECT f1 FROM alter_connection_table_{i};

            > CREATE SINK alter_connection_sink_{i} FROM mv_alter_connection_{i}
              INTO KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE DEBEZIUM;

            $ kafka-verify-topic sink=materialize.public.alter_connection_sink_{i} await-value-schema=true
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    i = self.index

    return [
        Testdrive(schema() + dedent(s))
        for s in [
            f"""
            $ kafka-ingest topic=alter-connection-{i}a format=bytes
            two

            > ALTER CONNECTION kafka_conn_alter_connection_{i}a SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});

            $ kafka-ingest topic=alter-connection-{i}a format=bytes
            three

            $ kafka-create-topic topic=alter-connection-{i}b

            $ kafka-ingest topic=alter-connection-{i}b format=bytes
            ten

            > CREATE CONNECTION kafka_conn_alter_connection_{i}b
              TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});

            > CREATE SOURCE alter_connection_source_{i}b
              FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}b (TOPIC 'testdrive-alter-connection-{i}b-${{testdrive.seed}}')
              FORMAT TEXT
              ENVELOPE NONE;

            $ kafka-ingest topic=alter-connection-{i}b format=bytes
            twenty

            {f"> ALTER CONNECTION ssh_tunnel_{i} SET (HOST = 'other_ssh_bastion') WITH (VALIDATE = true);" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "$ nop"}

            > INSERT INTO alter_connection_table_{i} VALUES (2);
            """,
            f"""
            $ kafka-ingest topic=alter-connection-{i}a format=bytes
            four

            $ kafka-ingest topic=alter-connection-{i}b format=bytes
            thirty

            > ALTER CONNECTION kafka_conn_alter_connection_{i}b SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});

            $ kafka-ingest topic=alter-connection-{i}b format=bytes
            fourty

            > INSERT INTO alter_connection_table_{i} VALUES (3);
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    i = self.index

    return Testdrive(
        dedent(
            f"""
            > SELECT regexp_match(create_sql, '(ssh-bastion-host|other_ssh_bastion)') FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
            {"{other_ssh_bastion}" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "{ssh-bastion-host}"}

            > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
            {'true' if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else 'false'}

            > SELECT * FROM alter_connection_source_{i}a;
            one
            two
            three
            four

            > SELECT * FROM alter_connection_source_{i}b;
            ten
            twenty
            thirty
            fourty

            > DROP SOURCE IF EXISTS alter_connection_sink_source_{i}

            > CREATE SOURCE alter_connection_sink_source_{i}
              FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE;

            # Table has expected data
            > SELECT * FROM alter_connection_table_{i};
            1
            2
            3

            # TODO: Reenable this check when kafka-verify-data can deal with validate being run twice
            # Sink Kafka topic has expected data
            # $ kafka-verify-data format=avro sink=materialize.public.alter_connection_sink_{i} sort-messages=true
            # {{"before": null, "after": {{"row":{{"f1": 1}}}}}}
            # {{"before": null, "after": {{"row":{{"f1": 2}}}}}}
            # {{"before": null, "after": {{"row":{{"f1": 3}}}}}}

            # Source based on sink topic has ingested data; data must be text-formatted because records are not
            # supported in testdrive
            > SELECT before::text, after::text FROM alter_connection_sink_source_{i};
            <null> (1)
            <null> (2)
            <null> (3)
            """
        )
    )
class AlterConnectionToNonSsh (base_version: MzVersion, rng: Random | None)
Expand source code Browse git
@externally_idempotent(False)
class AlterConnectionToNonSsh(AlterConnectionSshChangeBase):
    def __init__(self, base_version: MzVersion, rng: Random | None):
        super().__init__(SshChange.DROP_SSH, 2, base_version, rng)

Ancestors

Class variables

var externally_idempotent : bool
class AlterConnectionToSsh (base_version: MzVersion, rng: Random | None)
Expand source code Browse git
@externally_idempotent(False)
class AlterConnectionToSsh(AlterConnectionSshChangeBase):
    def __init__(self, base_version: MzVersion, rng: Random | None):
        super().__init__(SshChange.ADD_SSH, 1, base_version, rng)

Ancestors

Class variables

var externally_idempotent : bool
class SshChange (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class SshChange(Enum):
    ADD_SSH = 1
    DROP_SSH = 2
    CHANGE_SSH_HOST = 3

Ancestors

  • enum.Enum

Class variables

var ADD_SSH
var CHANGE_SSH_HOST
var DROP_SSH