Module materialize.checks.all_checks.ssh

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD


def schemas() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


@externally_idempotent(False)
class SshPg(Check):
    """
    Testing Postgres CDC source with SSH tunnel
    """

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                > CREATE SECRET pgpass AS 'postgres'

                > CREATE CONNECTION pg_ssh1 TO POSTGRES (
                  HOST postgres,
                  DATABASE postgres,
                  USER postgres,
                  PASSWORD SECRET pgpass,
                  SSL MODE require,
                  SSH TUNNEL ssh_tunnel_0);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                ALTER USER postgres WITH replication;
                CREATE TABLE t_ssh1 (f1 INTEGER);
                ALTER TABLE t_ssh1 REPLICA IDENTITY FULL;
                CREATE TABLE t_ssh2 (f1 INTEGER);
                ALTER TABLE t_ssh2 REPLICA IDENTITY FULL;
                CREATE TABLE t_ssh3 (f1 INTEGER);
                ALTER TABLE t_ssh3 REPLICA IDENTITY FULL;
                CREATE PUBLICATION mz_source_ssh FOR ALL TABLES;
                INSERT INTO t_ssh1 VALUES (1), (2), (3), (4), (5);

                > CREATE SOURCE mz_source_ssh1
                  FROM POSTGRES CONNECTION pg_ssh1
                  (PUBLICATION 'mz_source_ssh')
                  FOR TABLES (t_ssh1);
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                > CREATE CONNECTION pg_ssh2 TO POSTGRES (
                  HOST postgres,
                  DATABASE postgres,
                  USER postgres,
                  PASSWORD SECRET pgpass,
                  SSL MODE require,
                  SSH TUNNEL ssh_tunnel_0);

                > CREATE SOURCE mz_source_ssh2
                  FROM POSTGRES CONNECTION pg_ssh2
                  (PUBLICATION 'mz_source_ssh')
                  FOR TABLES (t_ssh2);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh1 VALUES (6), (7), (8), (9), (10);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh2 VALUES (6), (7), (8), (9), (10);
                """,
                """
                > CREATE CONNECTION pg_ssh3 TO POSTGRES (
                  HOST postgres,
                  DATABASE postgres,
                  USER postgres,
                  PASSWORD SECRET pgpass,
                  SSL MODE require,
                  SSH TUNNEL ssh_tunnel_0);

                > CREATE SOURCE mz_source_ssh3
                  FROM POSTGRES CONNECTION pg_ssh3
                  (PUBLICATION 'mz_source_ssh')
                  FOR TABLES (t_ssh3);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh1 VALUES (11), (12), (13), (14), (15);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh2 VALUES (11), (12), (13), (14), (15);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh3 VALUES (11), (12), (13), (14), (15);
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT COUNT(*) FROM t_ssh1;
                15

                > SELECT COUNT(*) FROM t_ssh2;
                10

                > SELECT COUNT(*) FROM t_ssh3;
                5
           """
            )
        )


@externally_idempotent(False)
class SshKafka(Check):
    """
    Testing Kafka source with SSH tunnel
    """

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                $ kafka-create-topic topic=ssh1

                $ kafka-create-topic topic=ssh2

                $ kafka-create-topic topic=ssh3

                $ kafka-ingest topic=ssh1 format=bytes
                one

                >[version<7800] CREATE CONNECTION kafka_conn_ssh1
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
                >[version>=7800] CREATE CONNECTION kafka_conn_ssh1
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

                > CREATE SOURCE ssh1
                  FROM KAFKA CONNECTION kafka_conn_ssh1 (TOPIC 'testdrive-ssh1-${testdrive.seed}')
                  FORMAT TEXT
                  ENVELOPE NONE;
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                >[version<7800] CREATE CONNECTION kafka_conn_ssh2
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
                >[version>=7800] CREATE CONNECTION kafka_conn_ssh2
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

                > CREATE SOURCE ssh2
                  FROM KAFKA CONNECTION kafka_conn_ssh2 (TOPIC 'testdrive-ssh2-${testdrive.seed}')
                  FORMAT TEXT
                  ENVELOPE NONE;

                $ kafka-ingest topic=ssh1 format=bytes
                two

                $ kafka-ingest topic=ssh2 format=bytes
                two
                """,
                """
                >[version<7800] CREATE CONNECTION kafka_conn_ssh3
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
                >[version>=7800] CREATE CONNECTION kafka_conn_ssh3
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

                > CREATE SOURCE ssh3
                  FROM KAFKA CONNECTION kafka_conn_ssh3 (TOPIC 'testdrive-ssh3-${testdrive.seed}')
                  FORMAT TEXT
                  ENVELOPE NONE;

                $ kafka-ingest topic=ssh1 format=bytes
                three

                $ kafka-ingest topic=ssh2 format=bytes
                three

                $ kafka-ingest topic=ssh3 format=bytes
                three
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT * FROM ssh1;
                one
                two
                three

                > SELECT * FROM ssh2;
                two
                three

                > SELECT * FROM ssh3;
                three
           """
            )
        )

Functions

def schemas() ‑> str
Expand source code Browse git
def schemas() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)

Classes

class SshKafka (base_version: MzVersion, rng: random.Random | None)

Testing Kafka source with SSH tunnel

Expand source code Browse git
@externally_idempotent(False)
class SshKafka(Check):
    """
    Testing Kafka source with SSH tunnel
    """

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                $ kafka-create-topic topic=ssh1

                $ kafka-create-topic topic=ssh2

                $ kafka-create-topic topic=ssh3

                $ kafka-ingest topic=ssh1 format=bytes
                one

                >[version<7800] CREATE CONNECTION kafka_conn_ssh1
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
                >[version>=7800] CREATE CONNECTION kafka_conn_ssh1
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

                > CREATE SOURCE ssh1
                  FROM KAFKA CONNECTION kafka_conn_ssh1 (TOPIC 'testdrive-ssh1-${testdrive.seed}')
                  FORMAT TEXT
                  ENVELOPE NONE;
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                >[version<7800] CREATE CONNECTION kafka_conn_ssh2
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
                >[version>=7800] CREATE CONNECTION kafka_conn_ssh2
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

                > CREATE SOURCE ssh2
                  FROM KAFKA CONNECTION kafka_conn_ssh2 (TOPIC 'testdrive-ssh2-${testdrive.seed}')
                  FORMAT TEXT
                  ENVELOPE NONE;

                $ kafka-ingest topic=ssh1 format=bytes
                two

                $ kafka-ingest topic=ssh2 format=bytes
                two
                """,
                """
                >[version<7800] CREATE CONNECTION kafka_conn_ssh3
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
                >[version>=7800] CREATE CONNECTION kafka_conn_ssh3
                  TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

                > CREATE SOURCE ssh3
                  FROM KAFKA CONNECTION kafka_conn_ssh3 (TOPIC 'testdrive-ssh3-${testdrive.seed}')
                  FORMAT TEXT
                  ENVELOPE NONE;

                $ kafka-ingest topic=ssh1 format=bytes
                three

                $ kafka-ingest topic=ssh2 format=bytes
                three

                $ kafka-ingest topic=ssh3 format=bytes
                three
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT * FROM ssh1;
                one
                two
                three

                > SELECT * FROM ssh2;
                two
                three

                > SELECT * FROM ssh3;
                three
           """
            )
        )

Ancestors

Class variables

var externally_idempotent : bool

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        schemas()
        + dedent(
            """
            $ kafka-create-topic topic=ssh1

            $ kafka-create-topic topic=ssh2

            $ kafka-create-topic topic=ssh3

            $ kafka-ingest topic=ssh1 format=bytes
            one

            >[version<7800] CREATE CONNECTION kafka_conn_ssh1
              TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
            >[version>=7800] CREATE CONNECTION kafka_conn_ssh1
              TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

            > CREATE SOURCE ssh1
              FROM KAFKA CONNECTION kafka_conn_ssh1 (TOPIC 'testdrive-ssh1-${testdrive.seed}')
              FORMAT TEXT
              ENVELOPE NONE;
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(schemas() + dedent(s))
        for s in [
            """
            >[version<7800] CREATE CONNECTION kafka_conn_ssh2
              TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
            >[version>=7800] CREATE CONNECTION kafka_conn_ssh2
              TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

            > CREATE SOURCE ssh2
              FROM KAFKA CONNECTION kafka_conn_ssh2 (TOPIC 'testdrive-ssh2-${testdrive.seed}')
              FORMAT TEXT
              ENVELOPE NONE;

            $ kafka-ingest topic=ssh1 format=bytes
            two

            $ kafka-ingest topic=ssh2 format=bytes
            two
            """,
            """
            >[version<7800] CREATE CONNECTION kafka_conn_ssh3
              TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0);
            >[version>=7800] CREATE CONNECTION kafka_conn_ssh3
              TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);

            > CREATE SOURCE ssh3
              FROM KAFKA CONNECTION kafka_conn_ssh3 (TOPIC 'testdrive-ssh3-${testdrive.seed}')
              FORMAT TEXT
              ENVELOPE NONE;

            $ kafka-ingest topic=ssh1 format=bytes
            three

            $ kafka-ingest topic=ssh2 format=bytes
            three

            $ kafka-ingest topic=ssh3 format=bytes
            three
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > SELECT * FROM ssh1;
            one
            two
            three

            > SELECT * FROM ssh2;
            two
            three

            > SELECT * FROM ssh3;
            three
       """
        )
    )
class SshPg (base_version: MzVersion, rng: random.Random | None)

Testing Postgres CDC source with SSH tunnel

Expand source code Browse git
@externally_idempotent(False)
class SshPg(Check):
    """
    Testing Postgres CDC source with SSH tunnel
    """

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                > CREATE SECRET pgpass AS 'postgres'

                > CREATE CONNECTION pg_ssh1 TO POSTGRES (
                  HOST postgres,
                  DATABASE postgres,
                  USER postgres,
                  PASSWORD SECRET pgpass,
                  SSL MODE require,
                  SSH TUNNEL ssh_tunnel_0);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                ALTER USER postgres WITH replication;
                CREATE TABLE t_ssh1 (f1 INTEGER);
                ALTER TABLE t_ssh1 REPLICA IDENTITY FULL;
                CREATE TABLE t_ssh2 (f1 INTEGER);
                ALTER TABLE t_ssh2 REPLICA IDENTITY FULL;
                CREATE TABLE t_ssh3 (f1 INTEGER);
                ALTER TABLE t_ssh3 REPLICA IDENTITY FULL;
                CREATE PUBLICATION mz_source_ssh FOR ALL TABLES;
                INSERT INTO t_ssh1 VALUES (1), (2), (3), (4), (5);

                > CREATE SOURCE mz_source_ssh1
                  FROM POSTGRES CONNECTION pg_ssh1
                  (PUBLICATION 'mz_source_ssh')
                  FOR TABLES (t_ssh1);
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                > CREATE CONNECTION pg_ssh2 TO POSTGRES (
                  HOST postgres,
                  DATABASE postgres,
                  USER postgres,
                  PASSWORD SECRET pgpass,
                  SSL MODE require,
                  SSH TUNNEL ssh_tunnel_0);

                > CREATE SOURCE mz_source_ssh2
                  FROM POSTGRES CONNECTION pg_ssh2
                  (PUBLICATION 'mz_source_ssh')
                  FOR TABLES (t_ssh2);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh1 VALUES (6), (7), (8), (9), (10);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh2 VALUES (6), (7), (8), (9), (10);
                """,
                """
                > CREATE CONNECTION pg_ssh3 TO POSTGRES (
                  HOST postgres,
                  DATABASE postgres,
                  USER postgres,
                  PASSWORD SECRET pgpass,
                  SSL MODE require,
                  SSH TUNNEL ssh_tunnel_0);

                > CREATE SOURCE mz_source_ssh3
                  FROM POSTGRES CONNECTION pg_ssh3
                  (PUBLICATION 'mz_source_ssh')
                  FOR TABLES (t_ssh3);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh1 VALUES (11), (12), (13), (14), (15);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh2 VALUES (11), (12), (13), (14), (15);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO t_ssh3 VALUES (11), (12), (13), (14), (15);
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT COUNT(*) FROM t_ssh1;
                15

                > SELECT COUNT(*) FROM t_ssh2;
                10

                > SELECT COUNT(*) FROM t_ssh3;
                5
           """
            )
        )

Ancestors

Class variables

var externally_idempotent : bool

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        schemas()
        + dedent(
            """
            > CREATE SECRET pgpass AS 'postgres'

            > CREATE CONNECTION pg_ssh1 TO POSTGRES (
              HOST postgres,
              DATABASE postgres,
              USER postgres,
              PASSWORD SECRET pgpass,
              SSL MODE require,
              SSH TUNNEL ssh_tunnel_0);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            ALTER USER postgres WITH replication;
            CREATE TABLE t_ssh1 (f1 INTEGER);
            ALTER TABLE t_ssh1 REPLICA IDENTITY FULL;
            CREATE TABLE t_ssh2 (f1 INTEGER);
            ALTER TABLE t_ssh2 REPLICA IDENTITY FULL;
            CREATE TABLE t_ssh3 (f1 INTEGER);
            ALTER TABLE t_ssh3 REPLICA IDENTITY FULL;
            CREATE PUBLICATION mz_source_ssh FOR ALL TABLES;
            INSERT INTO t_ssh1 VALUES (1), (2), (3), (4), (5);

            > CREATE SOURCE mz_source_ssh1
              FROM POSTGRES CONNECTION pg_ssh1
              (PUBLICATION 'mz_source_ssh')
              FOR TABLES (t_ssh1);
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(schemas() + dedent(s))
        for s in [
            """
            > CREATE CONNECTION pg_ssh2 TO POSTGRES (
              HOST postgres,
              DATABASE postgres,
              USER postgres,
              PASSWORD SECRET pgpass,
              SSL MODE require,
              SSH TUNNEL ssh_tunnel_0);

            > CREATE SOURCE mz_source_ssh2
              FROM POSTGRES CONNECTION pg_ssh2
              (PUBLICATION 'mz_source_ssh')
              FOR TABLES (t_ssh2);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO t_ssh1 VALUES (6), (7), (8), (9), (10);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO t_ssh2 VALUES (6), (7), (8), (9), (10);
            """,
            """
            > CREATE CONNECTION pg_ssh3 TO POSTGRES (
              HOST postgres,
              DATABASE postgres,
              USER postgres,
              PASSWORD SECRET pgpass,
              SSL MODE require,
              SSH TUNNEL ssh_tunnel_0);

            > CREATE SOURCE mz_source_ssh3
              FROM POSTGRES CONNECTION pg_ssh3
              (PUBLICATION 'mz_source_ssh')
              FOR TABLES (t_ssh3);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO t_ssh1 VALUES (11), (12), (13), (14), (15);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO t_ssh2 VALUES (11), (12), (13), (14), (15);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO t_ssh3 VALUES (11), (12), (13), (14), (15);
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > SELECT COUNT(*) FROM t_ssh1;
            15

            > SELECT COUNT(*) FROM t_ssh2;
            10

            > SELECT COUNT(*) FROM t_ssh3;
            5
       """
        )
    )