Module materialize.checks.all_checks.identifiers
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from random import Random
from textwrap import dedent
from typing import Any
from pg8000.converters import literal # type: ignore
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion
from materialize.util import naughty_strings
def dq(ident: str) -> str:
ident = ident.replace('"', '""')
return f'"{ident}"'
def dq_print(ident: str) -> str:
ident = ident.replace("\\", "\\\\")
ident = ident.replace('"', '\\"')
return f'"{ident}"'
def sq(ident: str) -> Any:
return literal(ident)
def schemas() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
def cluster() -> str:
return "> CREATE CLUSTER identifiers SIZE '4'\n"
class Identifiers(Check):
def _can_run(self, e: Executor) -> bool:
# CREATE ROLE not compatible with older releases
return self.base_version >= MzVersion.parse_mz("v0.47.0-dev")
IDENT_KEYS = [
"db",
"schema",
"type",
"table",
"column",
"value1",
"value2",
# "source",
"source_view",
"kafka_conn",
"csr_conn",
"secret",
# "secret_value",
"mv0",
"mv1",
"mv2",
"sink0",
"sink1",
"sink2",
"alias",
"role",
"comment_table",
"comment_column",
]
def __init__(self, base_version: MzVersion, rng: Random | None) -> None:
strings = naughty_strings()
values = (rng or Random(0)).sample(strings, len(self.IDENT_KEYS))
self.ident = {
key: value.encode("utf-8")[:255].decode("utf-8", "ignore")
for key, value in zip(self.IDENT_KEYS, values)
}
# ERROR: invalid input syntax for type bytea: invalid escape sequence
self.ident["secret_value"] = "secret_value"
# https://github.com/MaterializeInc/materialize/issues/22535
self.ident["source"] = "source"
super().__init__(base_version, rng)
def initialize(self) -> Testdrive:
cmds = f"""
> SET cluster=identifiers;
> CREATE ROLE {dq(self.ident["role"])};
> CREATE DATABASE {dq(self.ident["db"])};
> SET DATABASE={dq(self.ident["db"])};
> CREATE SCHEMA {dq(self.ident["schema"])};
> CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text);
> CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])});
> INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
> CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS
SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
$ kafka-create-topic topic=sink-source-ident
$ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000
{{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}
> CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()};
> CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
> CREATE SOURCE {dq(self.ident["source"])}
IN CLUSTER identifiers
FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
ENVELOPE UPSERT;
> CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS
SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1);
> CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])}
IN CLUSTER identifiers
FROM {dq(self.ident["source_view"])}
INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
ENVELOPE DEBEZIUM;
"""
if self.base_version >= MzVersion(0, 44, 0):
cmds += f"""
> CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
"""
if self.base_version >= MzVersion(0, 72, 0):
cmds += f"""
> COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};
> COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
"""
return Testdrive(schemas() + cluster() + dedent(cmds))
def manipulate(self) -> list[Testdrive]:
cmds = [
f"""
> SET CLUSTER=identifiers;
> SET DATABASE={dq(self.ident["db"])};
> CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS
SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
> INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
> CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])}
IN CLUSTER identifiers
FROM {dq(self.ident["source_view"])}
INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
ENVELOPE DEBEZIUM;
"""
for i in ["1", "2"]
]
return [Testdrive(dedent(s)) for s in cmds]
def validate(self) -> Testdrive:
cmds = f"""
> SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db';
materialize
{dq_print(self.ident["db"])}
> SET DATABASE={dq(self.ident["db"])};
> SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])}
{dq_print(self.ident["role"])}
> SHOW TYPES;
{dq_print(self.ident["type"])}
> SHOW SCHEMAS FROM {dq(self.ident["db"])};
public
information_schema
mz_catalog
mz_unsafe
mz_internal
pg_catalog
{dq_print(self.ident["schema"])}
> SHOW SINKS FROM {dq(self.ident["schema"])};
{dq_print(self.ident["sink0"])} kafka 4 identifiers
{dq_print(self.ident["sink1"])} kafka 4 identifiers
{dq_print(self.ident["sink2"])} kafka 4 identifiers
> SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])};
3
> SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])};
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
> SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])};
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
{dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
> SELECT * FROM {dq(self.ident["source_view"])};
U2 A 1000
"""
if self.base_version >= MzVersion(0, 72, 0):
cmds += f"""
> SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
<null> {dq_print(self.ident["comment_table"])}
1 {dq_print(self.ident["comment_column"])}
"""
if self.base_version >= MzVersion(0, 44, 0):
cmds += f"""
> SHOW SECRETS;
{dq_print(self.ident["secret"])}
"""
return Testdrive(dedent(cmds))
Functions
def cluster() ‑> str
-
Expand source code Browse git
def cluster() -> str: return "> CREATE CLUSTER identifiers SIZE '4'\n"
def dq(ident: str) ‑> str
-
Expand source code Browse git
def dq(ident: str) -> str: ident = ident.replace('"', '""') return f'"{ident}"'
def dq_print(ident: str) ‑> str
-
Expand source code Browse git
def dq_print(ident: str) -> str: ident = ident.replace("\\", "\\\\") ident = ident.replace('"', '\\"') return f'"{ident}"'
def schemas() ‑> str
-
Expand source code Browse git
def schemas() -> str: return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
def sq(ident: str) ‑> Any
-
Expand source code Browse git
def sq(ident: str) -> Any: return literal(ident)
Classes
class Identifiers (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
class Identifiers(Check): def _can_run(self, e: Executor) -> bool: # CREATE ROLE not compatible with older releases return self.base_version >= MzVersion.parse_mz("v0.47.0-dev") IDENT_KEYS = [ "db", "schema", "type", "table", "column", "value1", "value2", # "source", "source_view", "kafka_conn", "csr_conn", "secret", # "secret_value", "mv0", "mv1", "mv2", "sink0", "sink1", "sink2", "alias", "role", "comment_table", "comment_column", ] def __init__(self, base_version: MzVersion, rng: Random | None) -> None: strings = naughty_strings() values = (rng or Random(0)).sample(strings, len(self.IDENT_KEYS)) self.ident = { key: value.encode("utf-8")[:255].decode("utf-8", "ignore") for key, value in zip(self.IDENT_KEYS, values) } # ERROR: invalid input syntax for type bytea: invalid escape sequence self.ident["secret_value"] = "secret_value" # https://github.com/MaterializeInc/materialize/issues/22535 self.ident["source"] = "source" super().__init__(base_version, rng) def initialize(self) -> Testdrive: cmds = f""" > SET cluster=identifiers; > CREATE ROLE {dq(self.ident["role"])}; > CREATE DATABASE {dq(self.ident["db"])}; > SET DATABASE={dq(self.ident["db"])}; > CREATE SCHEMA {dq(self.ident["schema"])}; > CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text); > CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])}); > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])}); > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])}; $ kafka-create-topic topic=sink-source-ident $ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000 {{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}} > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()}; > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; > CREATE SOURCE {dq(self.ident["source"])} IN CLUSTER identifiers FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])} ENVELOPE UPSERT; > CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1); > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])} IN CLUSTER identifiers FROM {dq(self.ident["source_view"])} INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])} ENVELOPE DEBEZIUM; """ if self.base_version >= MzVersion(0, 44, 0): cmds += f""" > CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])}; """ if self.base_version >= MzVersion(0, 72, 0): cmds += f""" > COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])}; > COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])}; """ return Testdrive(schemas() + cluster() + dedent(cmds)) def manipulate(self) -> list[Testdrive]: cmds = [ f""" > SET CLUSTER=identifiers; > SET DATABASE={dq(self.ident["db"])}; > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])}; > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])}); > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])} IN CLUSTER identifiers FROM {dq(self.ident["source_view"])} INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])} ENVELOPE DEBEZIUM; """ for i in ["1", "2"] ] return [Testdrive(dedent(s)) for s in cmds] def validate(self) -> Testdrive: cmds = f""" > SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db'; materialize {dq_print(self.ident["db"])} > SET DATABASE={dq(self.ident["db"])}; > SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])} {dq_print(self.ident["role"])} > SHOW TYPES; {dq_print(self.ident["type"])} > SHOW SCHEMAS FROM {dq(self.ident["db"])}; public information_schema mz_catalog mz_unsafe mz_internal pg_catalog {dq_print(self.ident["schema"])} > SHOW SINKS FROM {dq(self.ident["schema"])}; {dq_print(self.ident["sink0"])} kafka 4 identifiers {dq_print(self.ident["sink1"])} kafka 4 identifiers {dq_print(self.ident["sink2"])} kafka 4 identifiers > SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])}; 3 > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])}; {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])}; {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} > SELECT * FROM {dq(self.ident["source_view"])}; U2 A 1000 """ if self.base_version >= MzVersion(0, 72, 0): cmds += f""" > SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])}; <null> {dq_print(self.ident["comment_table"])} 1 {dq_print(self.ident["comment_column"])} """ if self.base_version >= MzVersion(0, 44, 0): cmds += f""" > SHOW SECRETS; {dq_print(self.ident["secret"])} """ return Testdrive(dedent(cmds))
Ancestors
Class variables
var IDENT_KEYS
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: cmds = f""" > SET cluster=identifiers; > CREATE ROLE {dq(self.ident["role"])}; > CREATE DATABASE {dq(self.ident["db"])}; > SET DATABASE={dq(self.ident["db"])}; > CREATE SCHEMA {dq(self.ident["schema"])}; > CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text); > CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])}); > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])}); > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])}; $ kafka-create-topic topic=sink-source-ident $ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000 {{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}} > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()}; > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; > CREATE SOURCE {dq(self.ident["source"])} IN CLUSTER identifiers FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])} ENVELOPE UPSERT; > CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1); > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])} IN CLUSTER identifiers FROM {dq(self.ident["source_view"])} INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])} ENVELOPE DEBEZIUM; """ if self.base_version >= MzVersion(0, 44, 0): cmds += f""" > CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])}; """ if self.base_version >= MzVersion(0, 72, 0): cmds += f""" > COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])}; > COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])}; """ return Testdrive(schemas() + cluster() + dedent(cmds))
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: cmds = [ f""" > SET CLUSTER=identifiers; > SET DATABASE={dq(self.ident["db"])}; > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])}; > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])}); > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])} IN CLUSTER identifiers FROM {dq(self.ident["source_view"])} INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])} ENVELOPE DEBEZIUM; """ for i in ["1", "2"] ] return [Testdrive(dedent(s)) for s in cmds]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: cmds = f""" > SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db'; materialize {dq_print(self.ident["db"])} > SET DATABASE={dq(self.ident["db"])}; > SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])} {dq_print(self.ident["role"])} > SHOW TYPES; {dq_print(self.ident["type"])} > SHOW SCHEMAS FROM {dq(self.ident["db"])}; public information_schema mz_catalog mz_unsafe mz_internal pg_catalog {dq_print(self.ident["schema"])} > SHOW SINKS FROM {dq(self.ident["schema"])}; {dq_print(self.ident["sink0"])} kafka 4 identifiers {dq_print(self.ident["sink1"])} kafka 4 identifiers {dq_print(self.ident["sink2"])} kafka 4 identifiers > SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])}; 3 > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])}; {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])}; {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])} > SELECT * FROM {dq(self.ident["source_view"])}; U2 A 1000 """ if self.base_version >= MzVersion(0, 72, 0): cmds += f""" > SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])}; <null> {dq_print(self.ident["comment_table"])} 1 {dq_print(self.ident["comment_column"])} """ if self.base_version >= MzVersion(0, 44, 0): cmds += f""" > SHOW SECRETS; {dq_print(self.ident["secret"])} """ return Testdrive(dedent(cmds))