Module materialize.checks.all_checks.identifiers

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from random import Random
from textwrap import dedent
from typing import Any

from pg8000.converters import literal  # type: ignore

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion
from materialize.util import naughty_strings


def dq(ident: str) -> str:
    ident = ident.replace('"', '""')
    return f'"{ident}"'


def dq_print(ident: str) -> str:
    ident = ident.replace("\\", "\\\\")
    ident = ident.replace('"', '\\"')
    return f'"{ident}"'


def sq(ident: str) -> Any:
    return literal(ident)


def schemas() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


def cluster() -> str:
    return "> CREATE CLUSTER identifiers SIZE '4'\n"


class Identifiers(Check):
    def _can_run(self, e: Executor) -> bool:
        # CREATE ROLE not compatible with older releases
        return self.base_version >= MzVersion.parse_mz("v0.47.0-dev")

    IDENT_KEYS = [
        "db",
        "schema",
        "type",
        "table",
        "column",
        "value1",
        "value2",
        # "source",
        "source_view",
        "kafka_conn",
        "csr_conn",
        "secret",
        # "secret_value",
        "mv0",
        "mv1",
        "mv2",
        "sink0",
        "sink1",
        "sink2",
        "alias",
        "role",
        "comment_table",
        "comment_column",
    ]

    def __init__(self, base_version: MzVersion, rng: Random | None) -> None:
        strings = naughty_strings()
        values = (rng or Random(0)).sample(strings, len(self.IDENT_KEYS))
        self.ident = {
            key: value.encode("utf-8")[:255].decode("utf-8", "ignore")
            for key, value in zip(self.IDENT_KEYS, values)
        }
        # ERROR: invalid input syntax for type bytea: invalid escape sequence
        self.ident["secret_value"] = "secret_value"
        # https://github.com/MaterializeInc/materialize/issues/22535
        self.ident["source"] = "source"
        super().__init__(base_version, rng)

    def initialize(self) -> Testdrive:
        cmds = f"""
            > SET cluster=identifiers;
            > CREATE ROLE {dq(self.ident["role"])};
            > CREATE DATABASE {dq(self.ident["db"])};
            > SET DATABASE={dq(self.ident["db"])};
            > CREATE SCHEMA {dq(self.ident["schema"])};
            > CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text);
            > CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])});
            > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
            > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS
              SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};

            $ kafka-create-topic topic=sink-source-ident

            $ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000
            {{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}

            > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()};
            > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
            > CREATE SOURCE {dq(self.ident["source"])}
              IN CLUSTER identifiers
              FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
              ENVELOPE UPSERT;
            > CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS
              SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1);
            > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])}
              IN CLUSTER identifiers
              FROM {dq(self.ident["source_view"])}
              INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
              ENVELOPE DEBEZIUM;
            """
        if self.base_version >= MzVersion(0, 44, 0):
            cmds += f"""
            > CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
            """
        if self.base_version >= MzVersion(0, 72, 0):
            cmds += f"""
            > COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};

            > COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
            """

        return Testdrive(schemas() + cluster() + dedent(cmds))

    def manipulate(self) -> list[Testdrive]:
        cmds = [
            f"""
            > SET CLUSTER=identifiers;
            > SET DATABASE={dq(self.ident["db"])};
            > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS
              SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
            > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
            > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])}
              IN CLUSTER identifiers
              FROM {dq(self.ident["source_view"])}
              INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
              ENVELOPE DEBEZIUM;
            """
            for i in ["1", "2"]
        ]
        return [Testdrive(dedent(s)) for s in cmds]

    def validate(self) -> Testdrive:
        cmds = f"""
        > SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db';
        materialize
        {dq_print(self.ident["db"])}

        > SET DATABASE={dq(self.ident["db"])};

        > SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])}
        {dq_print(self.ident["role"])}

        > SHOW TYPES;
        {dq_print(self.ident["type"])}

        > SHOW SCHEMAS FROM {dq(self.ident["db"])};
        public
        information_schema
        mz_catalog
        mz_unsafe
        mz_internal
        pg_catalog
        {dq_print(self.ident["schema"])}

        > SHOW SINKS FROM {dq(self.ident["schema"])};
        {dq_print(self.ident["sink0"])} kafka 4 identifiers
        {dq_print(self.ident["sink1"])} kafka 4 identifiers
        {dq_print(self.ident["sink2"])} kafka 4 identifiers

        > SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])};
        3

        > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])};
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}

        > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])};
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}

        > SELECT * FROM {dq(self.ident["source_view"])};
        U2 A 1000
        """
        if self.base_version >= MzVersion(0, 72, 0):
            cmds += f"""
        > SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
        <null> {dq_print(self.ident["comment_table"])}
        1 {dq_print(self.ident["comment_column"])}
        """
        if self.base_version >= MzVersion(0, 44, 0):
            cmds += f"""
        > SHOW SECRETS;
        {dq_print(self.ident["secret"])}
        """
        return Testdrive(dedent(cmds))

Functions

def cluster() ‑> str
Expand source code Browse git
def cluster() -> str:
    return "> CREATE CLUSTER identifiers SIZE '4'\n"
def dq(ident: str) ‑> str
Expand source code Browse git
def dq(ident: str) -> str:
    ident = ident.replace('"', '""')
    return f'"{ident}"'
def dq_print(ident: str) ‑> str
Expand source code Browse git
def dq_print(ident: str) -> str:
    ident = ident.replace("\\", "\\\\")
    ident = ident.replace('"', '\\"')
    return f'"{ident}"'
def schemas() ‑> str
Expand source code Browse git
def schemas() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
def sq(ident: str) ‑> Any
Expand source code Browse git
def sq(ident: str) -> Any:
    return literal(ident)

Classes

class Identifiers (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
class Identifiers(Check):
    def _can_run(self, e: Executor) -> bool:
        # CREATE ROLE not compatible with older releases
        return self.base_version >= MzVersion.parse_mz("v0.47.0-dev")

    IDENT_KEYS = [
        "db",
        "schema",
        "type",
        "table",
        "column",
        "value1",
        "value2",
        # "source",
        "source_view",
        "kafka_conn",
        "csr_conn",
        "secret",
        # "secret_value",
        "mv0",
        "mv1",
        "mv2",
        "sink0",
        "sink1",
        "sink2",
        "alias",
        "role",
        "comment_table",
        "comment_column",
    ]

    def __init__(self, base_version: MzVersion, rng: Random | None) -> None:
        strings = naughty_strings()
        values = (rng or Random(0)).sample(strings, len(self.IDENT_KEYS))
        self.ident = {
            key: value.encode("utf-8")[:255].decode("utf-8", "ignore")
            for key, value in zip(self.IDENT_KEYS, values)
        }
        # ERROR: invalid input syntax for type bytea: invalid escape sequence
        self.ident["secret_value"] = "secret_value"
        # https://github.com/MaterializeInc/materialize/issues/22535
        self.ident["source"] = "source"
        super().__init__(base_version, rng)

    def initialize(self) -> Testdrive:
        cmds = f"""
            > SET cluster=identifiers;
            > CREATE ROLE {dq(self.ident["role"])};
            > CREATE DATABASE {dq(self.ident["db"])};
            > SET DATABASE={dq(self.ident["db"])};
            > CREATE SCHEMA {dq(self.ident["schema"])};
            > CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text);
            > CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])});
            > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
            > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS
              SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};

            $ kafka-create-topic topic=sink-source-ident

            $ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000
            {{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}

            > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()};
            > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
            > CREATE SOURCE {dq(self.ident["source"])}
              IN CLUSTER identifiers
              FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
              ENVELOPE UPSERT;
            > CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS
              SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1);
            > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])}
              IN CLUSTER identifiers
              FROM {dq(self.ident["source_view"])}
              INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
              ENVELOPE DEBEZIUM;
            """
        if self.base_version >= MzVersion(0, 44, 0):
            cmds += f"""
            > CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
            """
        if self.base_version >= MzVersion(0, 72, 0):
            cmds += f"""
            > COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};

            > COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
            """

        return Testdrive(schemas() + cluster() + dedent(cmds))

    def manipulate(self) -> list[Testdrive]:
        cmds = [
            f"""
            > SET CLUSTER=identifiers;
            > SET DATABASE={dq(self.ident["db"])};
            > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS
              SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
            > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
            > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])}
              IN CLUSTER identifiers
              FROM {dq(self.ident["source_view"])}
              INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
              ENVELOPE DEBEZIUM;
            """
            for i in ["1", "2"]
        ]
        return [Testdrive(dedent(s)) for s in cmds]

    def validate(self) -> Testdrive:
        cmds = f"""
        > SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db';
        materialize
        {dq_print(self.ident["db"])}

        > SET DATABASE={dq(self.ident["db"])};

        > SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])}
        {dq_print(self.ident["role"])}

        > SHOW TYPES;
        {dq_print(self.ident["type"])}

        > SHOW SCHEMAS FROM {dq(self.ident["db"])};
        public
        information_schema
        mz_catalog
        mz_unsafe
        mz_internal
        pg_catalog
        {dq_print(self.ident["schema"])}

        > SHOW SINKS FROM {dq(self.ident["schema"])};
        {dq_print(self.ident["sink0"])} kafka 4 identifiers
        {dq_print(self.ident["sink1"])} kafka 4 identifiers
        {dq_print(self.ident["sink2"])} kafka 4 identifiers

        > SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])};
        3

        > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])};
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}

        > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])};
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
        {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}

        > SELECT * FROM {dq(self.ident["source_view"])};
        U2 A 1000
        """
        if self.base_version >= MzVersion(0, 72, 0):
            cmds += f"""
        > SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
        <null> {dq_print(self.ident["comment_table"])}
        1 {dq_print(self.ident["comment_column"])}
        """
        if self.base_version >= MzVersion(0, 44, 0):
            cmds += f"""
        > SHOW SECRETS;
        {dq_print(self.ident["secret"])}
        """
        return Testdrive(dedent(cmds))

Ancestors

Class variables

var IDENT_KEYS

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    cmds = f"""
        > SET cluster=identifiers;
        > CREATE ROLE {dq(self.ident["role"])};
        > CREATE DATABASE {dq(self.ident["db"])};
        > SET DATABASE={dq(self.ident["db"])};
        > CREATE SCHEMA {dq(self.ident["schema"])};
        > CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text);
        > CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])});
        > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
        > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS
          SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};

        $ kafka-create-topic topic=sink-source-ident

        $ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000
        {{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}

        > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()};
        > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
        > CREATE SOURCE {dq(self.ident["source"])}
          IN CLUSTER identifiers
          FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}')
          FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
          ENVELOPE UPSERT;
        > CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS
          SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1);
        > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])}
          IN CLUSTER identifiers
          FROM {dq(self.ident["source_view"])}
          INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0')
          FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
          ENVELOPE DEBEZIUM;
        """
    if self.base_version >= MzVersion(0, 44, 0):
        cmds += f"""
        > CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
        """
    if self.base_version >= MzVersion(0, 72, 0):
        cmds += f"""
        > COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};

        > COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
        """

    return Testdrive(schemas() + cluster() + dedent(cmds))
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    cmds = [
        f"""
        > SET CLUSTER=identifiers;
        > SET DATABASE={dq(self.ident["db"])};
        > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS
          SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
        > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
        > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])}
          IN CLUSTER identifiers
          FROM {dq(self.ident["source_view"])}
          INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident')
          FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
          ENVELOPE DEBEZIUM;
        """
        for i in ["1", "2"]
    ]
    return [Testdrive(dedent(s)) for s in cmds]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    cmds = f"""
    > SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db';
    materialize
    {dq_print(self.ident["db"])}

    > SET DATABASE={dq(self.ident["db"])};

    > SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])}
    {dq_print(self.ident["role"])}

    > SHOW TYPES;
    {dq_print(self.ident["type"])}

    > SHOW SCHEMAS FROM {dq(self.ident["db"])};
    public
    information_schema
    mz_catalog
    mz_unsafe
    mz_internal
    pg_catalog
    {dq_print(self.ident["schema"])}

    > SHOW SINKS FROM {dq(self.ident["schema"])};
    {dq_print(self.ident["sink0"])} kafka 4 identifiers
    {dq_print(self.ident["sink1"])} kafka 4 identifiers
    {dq_print(self.ident["sink2"])} kafka 4 identifiers

    > SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])};
    3

    > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])};
    {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
    {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
    {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}

    > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])};
    {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
    {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
    {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}

    > SELECT * FROM {dq(self.ident["source_view"])};
    U2 A 1000
    """
    if self.base_version >= MzVersion(0, 72, 0):
        cmds += f"""
    > SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
    <null> {dq_print(self.ident["comment_table"])}
    1 {dq_print(self.ident["comment_column"])}
    """
    if self.base_version >= MzVersion(0, 44, 0):
        cmds += f"""
    > SHOW SECRETS;
    {dq_print(self.ident["secret"])}
    """
    return Testdrive(dedent(cmds))