Module materialize.checks.all_checks.sink

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion


def schemas() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


def schemas_null() -> str:
    return dedent(
        """
           $ set keyschema={
               "type": "record",
               "name": "Key",
               "fields": [
                   {"name": "key1", "type": "string"}
               ]
             }

           $ set schema={
               "type" : "record",
               "name" : "test",
               "fields" : [
                   {"name":"f1", "type":["null", "string"]},
                   {"name":"f2", "type":["long", "null"]}
               ]
             }
    """
    )


@externally_idempotent(False)
class SinkUpsert(Check):
    """Basic Check on sinks from an upsert source"""

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                $ kafka-create-topic topic=sink-source

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

                > CREATE SOURCE sink_source
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE UPSERT

                > CREATE MATERIALIZED VIEW sink_source_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM sink_source GROUP BY LEFT(key1, 2), LEFT(f1, 1);

                > CREATE SINK sink_sink1 FROM sink_source_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink2 FROM sink_source_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM
                """,
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
                {"key1": "U3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
                {"key1": "D3${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink3 FROM sink_source_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT CREATECLUSTER ON SYSTEM TO materialize

                $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                ALTER ROLE materialize CREATECLUSTER

                > SELECT * FROM sink_source_view;
                I2 B 1000
                I3 C 1000
                U2 B 1000
                U3 C 1000

                # We check the contents of the sink topics by re-ingesting them.

                > CREATE SOURCE sink_view1
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view2
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view3
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                # Validate the sink by aggregating all the 'before' and 'after' records using SQL
                > SELECT l_v, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v, (after).l_k, (after).c FROM sink_view1
                    UNION ALL
                    SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view1
                  ) GROUP BY l_v, l_k
                  HAVING SUM(c) > 0;
                B I2 1000
                B U2 1000
                C I3 1000
                C U3 1000

                > SELECT l_v, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v, (after).l_k, (after).c FROM sink_view2
                    UNION ALL
                    SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view2
                  ) GROUP BY l_v, l_k
                  HAVING SUM(c) > 0;
                B I2 1000
                B U2 1000
                C I3 1000
                C U3 1000

                > SELECT l_v, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v, (after).l_k, (after).c FROM sink_view3
                    UNION ALL
                    SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view3
                  ) GROUP BY l_v, l_k
                  HAVING SUM(c) > 0;
                B I2 1000
                B U2 1000
                C I3 1000
                C U3 1000

                > DROP SOURCE sink_view1 CASCADE;

                > DROP SOURCE sink_view2 CASCADE;

                > DROP SOURCE sink_view3 CASCADE;
            """
            )
        )


@externally_idempotent(False)
class SinkTables(Check):
    """Sink and re-ingest a large transaction from a table source"""

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                $[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
                ALTER SYSTEM SET enable_table_keys = true;
                """
            )
            + dedent(
                """
                > CREATE TABLE sink_large_transaction_table (f1 INTEGER, f2 TEXT, PRIMARY KEY (f1));
                > CREATE DEFAULT INDEX ON sink_large_transaction_table;

                > INSERT INTO sink_large_transaction_table SELECT generate_series, REPEAT('x', 1024) FROM generate_series(1, 100000);

                > CREATE MATERIALIZED VIEW sink_large_transaction_view AS SELECT f1 - 1 AS f1 , f2 FROM sink_large_transaction_table;

                > CREATE CLUSTER sink_large_transaction_sink1_cluster SIZE '4';

                > CREATE SINK sink_large_transaction_sink1
                  IN CLUSTER sink_large_transaction_sink1_cluster
                  FROM sink_large_transaction_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-large-transaction-sink-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM;
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                > UPDATE sink_large_transaction_table SET f2 = REPEAT('y', 1024)
                """,
                """
                > UPDATE sink_large_transaction_table SET f2 = REPEAT('z', 1024)
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ schema-registry-verify schema-type=avro subject=testdrive-sink-large-transaction-sink-${testdrive.seed}-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"f1","type":"int"},{"name":"f2","type":["null","string"]}]}]},{"name":"after","type":["null","row"]}]}

                # We check the contents of the sink topics by re-ingesting them.
                > CREATE SOURCE sink_large_transaction_source
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-large-transaction-sink-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE MATERIALIZED VIEW sink_large_transaction_view2
                  AS
                  SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
                  FROM (
                    SELECT (before).f1, (before).f2 FROM sink_large_transaction_source
                  )

                > CREATE MATERIALIZED VIEW sink_large_transaction_view3
                  AS
                  SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
                  FROM (
                    SELECT (after).f1, (after).f2 FROM sink_large_transaction_source
                  )

                > CREATE MATERIALIZED VIEW sink_large_transaction_view4
                  AS
                  SELECT LEFT(f2, 1), SUM(c)
                  FROM (
                    SELECT (after).f2, COUNT(*) AS c FROM sink_large_transaction_source GROUP BY (after).f2
                    UNION ALL
                    SELECT (before).f2, -COUNT(*) AS c  FROM sink_large_transaction_source GROUP BY (before).f2
                  )
                  GROUP BY f2

                > SELECT * FROM sink_large_transaction_view2
                500000 200000 100000 0 99999

                > SELECT * FROM sink_large_transaction_view3
                500000 300000 100000 0 99999

                > SELECT * FROM sink_large_transaction_view4
                <null> -100000
                x 0
                y 0
                z 100000

                > DROP SOURCE sink_large_transaction_source CASCADE;
            """
            )
        )


@externally_idempotent(False)
class SinkNullDefaults(Check):
    """Check on an Avro sink with NULL DEFAULTS"""

    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.71.0-dev")

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas_null()
            + dedent(
                """
                $ kafka-create-topic topic=sink-source-null

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

                > CREATE SOURCE sink_source_null
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-null-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE UPSERT

                > CREATE MATERIALIZED VIEW sink_source_null_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_null GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100;

                > CREATE SINK sink_sink_null1 FROM sink_source_null_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS )
                  ENVELOPE DEBEZIUM
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas_null() + dedent(s))
            for s in [
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_null_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink_null2 FROM sink_source_null_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS )
                  ENVELOPE DEBEZIUM
                """,
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_null_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink_null3 FROM sink_source_null_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS )
                  ENVELOPE DEBEZIUM
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ schema-registry-verify schema-type=avro subject=sink-sink-null1-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-null2-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-null3-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_null_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT CREATECLUSTER ON SYSTEM TO materialize

                $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                ALTER ROLE materialize CREATECLUSTER

                > SELECT * FROM sink_source_null_view;
                D3 <null> 0 100
                D3 <null> 1 100
                D3 <null> 2 100
                D3 <null> 3 100
                D3 <null> 4 100
                D3 <null> 5 100
                D3 <null> 6 100
                D3 <null> 7 100
                D3 <null> 8 100
                D3 <null> 9 100
                I2 B <null> 1000
                U2 <null> 0 100
                U2 <null> 1 100
                U2 <null> 2 100
                U2 <null> 3 100
                U2 <null> 4 100
                U2 <null> 5 100
                U2 <null> 6 100
                U2 <null> 7 100
                U2 <null> 8 100
                U2 <null> 9 100
                U3 A <null> 1000

                # We check the contents of the sink topics by re-ingesting them.

                > CREATE SOURCE sink_view_null1
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view_null2
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view_null3
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                # Validate the sink by aggregating all the 'before' and 'after' records using SQL
                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null1
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null1
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null2
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null2
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null3
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null3
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > DROP SOURCE sink_view_null1 CASCADE;

                > DROP SOURCE sink_view_null2 CASCADE;

                > DROP SOURCE sink_view_null3 CASCADE;
            """
            )
        )


@externally_idempotent(False)
class SinkComments(Check):
    """Check on an Avro sink with comments"""

    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.73.0-dev")

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas_null()
            + dedent(
                """
                $ kafka-create-topic topic=sink-sourcecomments

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

                > CREATE SOURCE sink_source_comments
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-comments-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE UPSERT

                > CREATE MATERIALIZED VIEW sink_source_comments_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_comments GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100

                > COMMENT ON MATERIALIZED VIEW sink_source_comments_view IS 'comment on view sink_source_comments_view'

                > CREATE SINK sink_sink_comments1 FROM sink_source_comments_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
                  KEY (l_v2) NOT ENFORCED
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS,
                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
                  )
                  ENVELOPE DEBEZIUM
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas_null() + dedent(s))
            for s in [
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_comments_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink_comments2 FROM sink_source_comments_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
                  KEY (l_v2) NOT ENFORCED
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS,
                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
                  )
                  ENVELOPE DEBEZIUM
                """,
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_comments_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink_comments3 FROM sink_source_comments_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
                  KEY (l_v2) NOT ENFORCED
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS,
                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
                  )
                  ENVELOPE DEBEZIUM
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-key
                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-key
                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-key
                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_comments_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT CREATECLUSTER ON SYSTEM TO materialize

                $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                ALTER ROLE materialize CREATECLUSTER

                > SELECT * FROM sink_source_comments_view;
                D3 <null> 0 100
                D3 <null> 1 100
                D3 <null> 2 100
                D3 <null> 3 100
                D3 <null> 4 100
                D3 <null> 5 100
                D3 <null> 6 100
                D3 <null> 7 100
                D3 <null> 8 100
                D3 <null> 9 100
                I2 B <null> 1000
                U2 <null> 0 100
                U2 <null> 1 100
                U2 <null> 2 100
                U2 <null> 3 100
                U2 <null> 4 100
                U2 <null> 5 100
                U2 <null> 6 100
                U2 <null> 7 100
                U2 <null> 8 100
                U2 <null> 9 100
                U3 A <null> 1000

                # We check the contents of the sink topics by re-ingesting them.

                > CREATE SOURCE sink_view_comments1
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view_comments2
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view_comments3
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                # Validate the sink by aggregating all the 'before' and 'after' records using SQL
                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments1
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments1
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments2
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments2
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments3
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments3
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > DROP SOURCE sink_view_comments1 CASCADE;

                > DROP SOURCE sink_view_comments2 CASCADE;

                > DROP SOURCE sink_view_comments3 CASCADE;
            """
            )
        )

Functions

def schemas() ‑> str
Expand source code Browse git
def schemas() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
def schemas_null() ‑> str
Expand source code Browse git
def schemas_null() -> str:
    return dedent(
        """
           $ set keyschema={
               "type": "record",
               "name": "Key",
               "fields": [
                   {"name": "key1", "type": "string"}
               ]
             }

           $ set schema={
               "type" : "record",
               "name" : "test",
               "fields" : [
                   {"name":"f1", "type":["null", "string"]},
                   {"name":"f2", "type":["long", "null"]}
               ]
             }
    """
    )

Classes

class SinkComments (base_version: MzVersion, rng: random.Random | None)

Check on an Avro sink with comments

Expand source code Browse git
@externally_idempotent(False)
class SinkComments(Check):
    """Check on an Avro sink with comments"""

    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.73.0-dev")

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas_null()
            + dedent(
                """
                $ kafka-create-topic topic=sink-sourcecomments

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

                > CREATE SOURCE sink_source_comments
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-comments-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE UPSERT

                > CREATE MATERIALIZED VIEW sink_source_comments_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_comments GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100

                > COMMENT ON MATERIALIZED VIEW sink_source_comments_view IS 'comment on view sink_source_comments_view'

                > CREATE SINK sink_sink_comments1 FROM sink_source_comments_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
                  KEY (l_v2) NOT ENFORCED
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS,
                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
                  )
                  ENVELOPE DEBEZIUM
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas_null() + dedent(s))
            for s in [
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_comments_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink_comments2 FROM sink_source_comments_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
                  KEY (l_v2) NOT ENFORCED
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS,
                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
                  )
                  ENVELOPE DEBEZIUM
                """,
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_comments_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink_comments3 FROM sink_source_comments_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
                  KEY (l_v2) NOT ENFORCED
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS,
                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
                  )
                  ENVELOPE DEBEZIUM
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-key
                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-key
                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-key
                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_comments_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT CREATECLUSTER ON SYSTEM TO materialize

                $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                ALTER ROLE materialize CREATECLUSTER

                > SELECT * FROM sink_source_comments_view;
                D3 <null> 0 100
                D3 <null> 1 100
                D3 <null> 2 100
                D3 <null> 3 100
                D3 <null> 4 100
                D3 <null> 5 100
                D3 <null> 6 100
                D3 <null> 7 100
                D3 <null> 8 100
                D3 <null> 9 100
                I2 B <null> 1000
                U2 <null> 0 100
                U2 <null> 1 100
                U2 <null> 2 100
                U2 <null> 3 100
                U2 <null> 4 100
                U2 <null> 5 100
                U2 <null> 6 100
                U2 <null> 7 100
                U2 <null> 8 100
                U2 <null> 9 100
                U3 A <null> 1000

                # We check the contents of the sink topics by re-ingesting them.

                > CREATE SOURCE sink_view_comments1
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view_comments2
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view_comments3
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                # Validate the sink by aggregating all the 'before' and 'after' records using SQL
                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments1
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments1
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments2
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments2
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments3
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments3
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > DROP SOURCE sink_view_comments1 CASCADE;

                > DROP SOURCE sink_view_comments2 CASCADE;

                > DROP SOURCE sink_view_comments3 CASCADE;
            """
            )
        )

Ancestors

Class variables

var externally_idempotent : bool

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        schemas_null()
        + dedent(
            """
            $ kafka-create-topic topic=sink-sourcecomments

            $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

            $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

            $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

            $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

            > CREATE SOURCE sink_source_comments
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-comments-${testdrive.seed}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE UPSERT

            > CREATE MATERIALIZED VIEW sink_source_comments_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_comments GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100

            > COMMENT ON MATERIALIZED VIEW sink_source_comments_view IS 'comment on view sink_source_comments_view'

            > CREATE SINK sink_sink_comments1 FROM sink_source_comments_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
              KEY (l_v2) NOT ENFORCED
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ( NULL DEFAULTS,
                DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
              )
              ENVELOPE DEBEZIUM
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(schemas_null() + dedent(s))
        for s in [
            """
            $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_comments_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
            {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
            {"key1": "D2${kafka-ingest.iteration}"}

            > CREATE SINK sink_sink_comments2 FROM sink_source_comments_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
              KEY (l_v2) NOT ENFORCED
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ( NULL DEFAULTS,
                DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
              )
              ENVELOPE DEBEZIUM
            """,
            """
            $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_comments_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
            {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
            {"key1": "D2${kafka-ingest.iteration}"}

            > CREATE SINK sink_sink_comments3 FROM sink_source_comments_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
              KEY (l_v2) NOT ENFORCED
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ( NULL DEFAULTS,
                DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
                VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
                KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
              )
              ENVELOPE DEBEZIUM
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-key
            {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

            $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-key
            {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

            $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-key
            {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}

            $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-value
            {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

            $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-value
            {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

            $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-value
            {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

            $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_comments_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT CREATECLUSTER ON SYSTEM TO materialize

            $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            ALTER ROLE materialize CREATECLUSTER

            > SELECT * FROM sink_source_comments_view;
            D3 <null> 0 100
            D3 <null> 1 100
            D3 <null> 2 100
            D3 <null> 3 100
            D3 <null> 4 100
            D3 <null> 5 100
            D3 <null> 6 100
            D3 <null> 7 100
            D3 <null> 8 100
            D3 <null> 9 100
            I2 B <null> 1000
            U2 <null> 0 100
            U2 <null> 1 100
            U2 <null> 2 100
            U2 <null> 3 100
            U2 <null> 4 100
            U2 <null> 5 100
            U2 <null> 6 100
            U2 <null> 7 100
            U2 <null> 8 100
            U2 <null> 9 100
            U3 A <null> 1000

            # We check the contents of the sink topics by re-ingesting them.

            > CREATE SOURCE sink_view_comments1
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            > CREATE SOURCE sink_view_comments2
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            > CREATE SOURCE sink_view_comments3
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            # Validate the sink by aggregating all the 'before' and 'after' records using SQL
            > SELECT l_v1, l_v2, l_k, SUM(c)
              FROM (
                SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments1
                UNION ALL
                SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments1
              ) GROUP BY l_v1, l_v2, l_k
              HAVING SUM(c) > 0;
            <null> 0 D3 100
            <null> 0 U2 100
            <null> 1 D3 100
            <null> 1 U2 100
            <null> 2 D3 100
            <null> 2 U2 100
            <null> 3 D3 100
            <null> 3 U2 100
            <null> 4 D3 100
            <null> 4 U2 100
            <null> 5 D3 100
            <null> 5 U2 100
            <null> 6 D3 100
            <null> 6 U2 100
            <null> 7 D3 100
            <null> 7 U2 100
            <null> 8 D3 100
            <null> 8 U2 100
            <null> 9 D3 100
            <null> 9 U2 100
            A <null> U3 1000
            B <null> I2 1000

            > SELECT l_v1, l_v2, l_k, SUM(c)
              FROM (
                SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments2
                UNION ALL
                SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments2
              ) GROUP BY l_v1, l_v2, l_k
              HAVING SUM(c) > 0;
            <null> 0 D3 100
            <null> 0 U2 100
            <null> 1 D3 100
            <null> 1 U2 100
            <null> 2 D3 100
            <null> 2 U2 100
            <null> 3 D3 100
            <null> 3 U2 100
            <null> 4 D3 100
            <null> 4 U2 100
            <null> 5 D3 100
            <null> 5 U2 100
            <null> 6 D3 100
            <null> 6 U2 100
            <null> 7 D3 100
            <null> 7 U2 100
            <null> 8 D3 100
            <null> 8 U2 100
            <null> 9 D3 100
            <null> 9 U2 100
            A <null> U3 1000
            B <null> I2 1000

            > SELECT l_v1, l_v2, l_k, SUM(c)
              FROM (
                SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments3
                UNION ALL
                SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments3
              ) GROUP BY l_v1, l_v2, l_k
              HAVING SUM(c) > 0;
            <null> 0 D3 100
            <null> 0 U2 100
            <null> 1 D3 100
            <null> 1 U2 100
            <null> 2 D3 100
            <null> 2 U2 100
            <null> 3 D3 100
            <null> 3 U2 100
            <null> 4 D3 100
            <null> 4 U2 100
            <null> 5 D3 100
            <null> 5 U2 100
            <null> 6 D3 100
            <null> 6 U2 100
            <null> 7 D3 100
            <null> 7 U2 100
            <null> 8 D3 100
            <null> 8 U2 100
            <null> 9 D3 100
            <null> 9 U2 100
            A <null> U3 1000
            B <null> I2 1000

            > DROP SOURCE sink_view_comments1 CASCADE;

            > DROP SOURCE sink_view_comments2 CASCADE;

            > DROP SOURCE sink_view_comments3 CASCADE;
        """
        )
    )
class SinkNullDefaults (base_version: MzVersion, rng: random.Random | None)

Check on an Avro sink with NULL DEFAULTS

Expand source code Browse git
@externally_idempotent(False)
class SinkNullDefaults(Check):
    """Check on an Avro sink with NULL DEFAULTS"""

    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.71.0-dev")

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas_null()
            + dedent(
                """
                $ kafka-create-topic topic=sink-source-null

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

                > CREATE SOURCE sink_source_null
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-null-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE UPSERT

                > CREATE MATERIALIZED VIEW sink_source_null_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_null GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100;

                > CREATE SINK sink_sink_null1 FROM sink_source_null_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS )
                  ENVELOPE DEBEZIUM
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas_null() + dedent(s))
            for s in [
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_null_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink_null2 FROM sink_source_null_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS )
                  ENVELOPE DEBEZIUM
                """,
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_null_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink_null3 FROM sink_source_null_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ( NULL DEFAULTS )
                  ENVELOPE DEBEZIUM
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ schema-registry-verify schema-type=avro subject=sink-sink-null1-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-null2-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ schema-registry-verify schema-type=avro subject=sink-sink-null3-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

                $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_null_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT CREATECLUSTER ON SYSTEM TO materialize

                $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                ALTER ROLE materialize CREATECLUSTER

                > SELECT * FROM sink_source_null_view;
                D3 <null> 0 100
                D3 <null> 1 100
                D3 <null> 2 100
                D3 <null> 3 100
                D3 <null> 4 100
                D3 <null> 5 100
                D3 <null> 6 100
                D3 <null> 7 100
                D3 <null> 8 100
                D3 <null> 9 100
                I2 B <null> 1000
                U2 <null> 0 100
                U2 <null> 1 100
                U2 <null> 2 100
                U2 <null> 3 100
                U2 <null> 4 100
                U2 <null> 5 100
                U2 <null> 6 100
                U2 <null> 7 100
                U2 <null> 8 100
                U2 <null> 9 100
                U3 A <null> 1000

                # We check the contents of the sink topics by re-ingesting them.

                > CREATE SOURCE sink_view_null1
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view_null2
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view_null3
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                # Validate the sink by aggregating all the 'before' and 'after' records using SQL
                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null1
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null1
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null2
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null2
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > SELECT l_v1, l_v2, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null3
                    UNION ALL
                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null3
                  ) GROUP BY l_v1, l_v2, l_k
                  HAVING SUM(c) > 0;
                <null> 0 D3 100
                <null> 0 U2 100
                <null> 1 D3 100
                <null> 1 U2 100
                <null> 2 D3 100
                <null> 2 U2 100
                <null> 3 D3 100
                <null> 3 U2 100
                <null> 4 D3 100
                <null> 4 U2 100
                <null> 5 D3 100
                <null> 5 U2 100
                <null> 6 D3 100
                <null> 6 U2 100
                <null> 7 D3 100
                <null> 7 U2 100
                <null> 8 D3 100
                <null> 8 U2 100
                <null> 9 D3 100
                <null> 9 U2 100
                A <null> U3 1000
                B <null> I2 1000

                > DROP SOURCE sink_view_null1 CASCADE;

                > DROP SOURCE sink_view_null2 CASCADE;

                > DROP SOURCE sink_view_null3 CASCADE;
            """
            )
        )

Ancestors

Class variables

var externally_idempotent : bool

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        schemas_null()
        + dedent(
            """
            $ kafka-create-topic topic=sink-source-null

            $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

            $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

            $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}

            $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}

            > CREATE SOURCE sink_source_null
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-null-${testdrive.seed}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE UPSERT

            > CREATE MATERIALIZED VIEW sink_source_null_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_null GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100;

            > CREATE SINK sink_sink_null1 FROM sink_source_null_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null1')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ( NULL DEFAULTS )
              ENVELOPE DEBEZIUM
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(schemas_null() + dedent(s))
        for s in [
            """
            $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_null_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
            {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
            {"key1": "D2${kafka-ingest.iteration}"}

            > CREATE SINK sink_sink_null2 FROM sink_source_null_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null2')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ( NULL DEFAULTS )
              ENVELOPE DEBEZIUM
            """,
            """
            $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_null_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
            {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
            {"key1": "D2${kafka-ingest.iteration}"}

            > CREATE SINK sink_sink_null3 FROM sink_source_null_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null3')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ( NULL DEFAULTS )
              ENVELOPE DEBEZIUM
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            $ schema-registry-verify schema-type=avro subject=sink-sink-null1-value
            {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

            $ schema-registry-verify schema-type=avro subject=sink-sink-null2-value
            {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

            $ schema-registry-verify schema-type=avro subject=sink-sink-null3-value
            {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}

            $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_null_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT CREATECLUSTER ON SYSTEM TO materialize

            $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            ALTER ROLE materialize CREATECLUSTER

            > SELECT * FROM sink_source_null_view;
            D3 <null> 0 100
            D3 <null> 1 100
            D3 <null> 2 100
            D3 <null> 3 100
            D3 <null> 4 100
            D3 <null> 5 100
            D3 <null> 6 100
            D3 <null> 7 100
            D3 <null> 8 100
            D3 <null> 9 100
            I2 B <null> 1000
            U2 <null> 0 100
            U2 <null> 1 100
            U2 <null> 2 100
            U2 <null> 3 100
            U2 <null> 4 100
            U2 <null> 5 100
            U2 <null> 6 100
            U2 <null> 7 100
            U2 <null> 8 100
            U2 <null> 9 100
            U3 A <null> 1000

            # We check the contents of the sink topics by re-ingesting them.

            > CREATE SOURCE sink_view_null1
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null1')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            > CREATE SOURCE sink_view_null2
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null2')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            > CREATE SOURCE sink_view_null3
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null3')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            # Validate the sink by aggregating all the 'before' and 'after' records using SQL
            > SELECT l_v1, l_v2, l_k, SUM(c)
              FROM (
                SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null1
                UNION ALL
                SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null1
              ) GROUP BY l_v1, l_v2, l_k
              HAVING SUM(c) > 0;
            <null> 0 D3 100
            <null> 0 U2 100
            <null> 1 D3 100
            <null> 1 U2 100
            <null> 2 D3 100
            <null> 2 U2 100
            <null> 3 D3 100
            <null> 3 U2 100
            <null> 4 D3 100
            <null> 4 U2 100
            <null> 5 D3 100
            <null> 5 U2 100
            <null> 6 D3 100
            <null> 6 U2 100
            <null> 7 D3 100
            <null> 7 U2 100
            <null> 8 D3 100
            <null> 8 U2 100
            <null> 9 D3 100
            <null> 9 U2 100
            A <null> U3 1000
            B <null> I2 1000

            > SELECT l_v1, l_v2, l_k, SUM(c)
              FROM (
                SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null2
                UNION ALL
                SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null2
              ) GROUP BY l_v1, l_v2, l_k
              HAVING SUM(c) > 0;
            <null> 0 D3 100
            <null> 0 U2 100
            <null> 1 D3 100
            <null> 1 U2 100
            <null> 2 D3 100
            <null> 2 U2 100
            <null> 3 D3 100
            <null> 3 U2 100
            <null> 4 D3 100
            <null> 4 U2 100
            <null> 5 D3 100
            <null> 5 U2 100
            <null> 6 D3 100
            <null> 6 U2 100
            <null> 7 D3 100
            <null> 7 U2 100
            <null> 8 D3 100
            <null> 8 U2 100
            <null> 9 D3 100
            <null> 9 U2 100
            A <null> U3 1000
            B <null> I2 1000

            > SELECT l_v1, l_v2, l_k, SUM(c)
              FROM (
                SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null3
                UNION ALL
                SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null3
              ) GROUP BY l_v1, l_v2, l_k
              HAVING SUM(c) > 0;
            <null> 0 D3 100
            <null> 0 U2 100
            <null> 1 D3 100
            <null> 1 U2 100
            <null> 2 D3 100
            <null> 2 U2 100
            <null> 3 D3 100
            <null> 3 U2 100
            <null> 4 D3 100
            <null> 4 U2 100
            <null> 5 D3 100
            <null> 5 U2 100
            <null> 6 D3 100
            <null> 6 U2 100
            <null> 7 D3 100
            <null> 7 U2 100
            <null> 8 D3 100
            <null> 8 U2 100
            <null> 9 D3 100
            <null> 9 U2 100
            A <null> U3 1000
            B <null> I2 1000

            > DROP SOURCE sink_view_null1 CASCADE;

            > DROP SOURCE sink_view_null2 CASCADE;

            > DROP SOURCE sink_view_null3 CASCADE;
        """
        )
    )
class SinkTables (base_version: MzVersion, rng: random.Random | None)

Sink and re-ingest a large transaction from a table source

Expand source code Browse git
@externally_idempotent(False)
class SinkTables(Check):
    """Sink and re-ingest a large transaction from a table source"""

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                $[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
                ALTER SYSTEM SET enable_table_keys = true;
                """
            )
            + dedent(
                """
                > CREATE TABLE sink_large_transaction_table (f1 INTEGER, f2 TEXT, PRIMARY KEY (f1));
                > CREATE DEFAULT INDEX ON sink_large_transaction_table;

                > INSERT INTO sink_large_transaction_table SELECT generate_series, REPEAT('x', 1024) FROM generate_series(1, 100000);

                > CREATE MATERIALIZED VIEW sink_large_transaction_view AS SELECT f1 - 1 AS f1 , f2 FROM sink_large_transaction_table;

                > CREATE CLUSTER sink_large_transaction_sink1_cluster SIZE '4';

                > CREATE SINK sink_large_transaction_sink1
                  IN CLUSTER sink_large_transaction_sink1_cluster
                  FROM sink_large_transaction_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-large-transaction-sink-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM;
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                > UPDATE sink_large_transaction_table SET f2 = REPEAT('y', 1024)
                """,
                """
                > UPDATE sink_large_transaction_table SET f2 = REPEAT('z', 1024)
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ schema-registry-verify schema-type=avro subject=testdrive-sink-large-transaction-sink-${testdrive.seed}-value
                {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"f1","type":"int"},{"name":"f2","type":["null","string"]}]}]},{"name":"after","type":["null","row"]}]}

                # We check the contents of the sink topics by re-ingesting them.
                > CREATE SOURCE sink_large_transaction_source
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-large-transaction-sink-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE MATERIALIZED VIEW sink_large_transaction_view2
                  AS
                  SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
                  FROM (
                    SELECT (before).f1, (before).f2 FROM sink_large_transaction_source
                  )

                > CREATE MATERIALIZED VIEW sink_large_transaction_view3
                  AS
                  SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
                  FROM (
                    SELECT (after).f1, (after).f2 FROM sink_large_transaction_source
                  )

                > CREATE MATERIALIZED VIEW sink_large_transaction_view4
                  AS
                  SELECT LEFT(f2, 1), SUM(c)
                  FROM (
                    SELECT (after).f2, COUNT(*) AS c FROM sink_large_transaction_source GROUP BY (after).f2
                    UNION ALL
                    SELECT (before).f2, -COUNT(*) AS c  FROM sink_large_transaction_source GROUP BY (before).f2
                  )
                  GROUP BY f2

                > SELECT * FROM sink_large_transaction_view2
                500000 200000 100000 0 99999

                > SELECT * FROM sink_large_transaction_view3
                500000 300000 100000 0 99999

                > SELECT * FROM sink_large_transaction_view4
                <null> -100000
                x 0
                y 0
                z 100000

                > DROP SOURCE sink_large_transaction_source CASCADE;
            """
            )
        )

Ancestors

Class variables

var externally_idempotent : bool

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        schemas()
        + dedent(
            """
            $[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
            ALTER SYSTEM SET enable_table_keys = true;
            """
        )
        + dedent(
            """
            > CREATE TABLE sink_large_transaction_table (f1 INTEGER, f2 TEXT, PRIMARY KEY (f1));
            > CREATE DEFAULT INDEX ON sink_large_transaction_table;

            > INSERT INTO sink_large_transaction_table SELECT generate_series, REPEAT('x', 1024) FROM generate_series(1, 100000);

            > CREATE MATERIALIZED VIEW sink_large_transaction_view AS SELECT f1 - 1 AS f1 , f2 FROM sink_large_transaction_table;

            > CREATE CLUSTER sink_large_transaction_sink1_cluster SIZE '4';

            > CREATE SINK sink_large_transaction_sink1
              IN CLUSTER sink_large_transaction_sink1_cluster
              FROM sink_large_transaction_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-large-transaction-sink-${testdrive.seed}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE DEBEZIUM;
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(schemas() + dedent(s))
        for s in [
            """
            > UPDATE sink_large_transaction_table SET f2 = REPEAT('y', 1024)
            """,
            """
            > UPDATE sink_large_transaction_table SET f2 = REPEAT('z', 1024)
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            $ schema-registry-verify schema-type=avro subject=testdrive-sink-large-transaction-sink-${testdrive.seed}-value
            {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"f1","type":"int"},{"name":"f2","type":["null","string"]}]}]},{"name":"after","type":["null","row"]}]}

            # We check the contents of the sink topics by re-ingesting them.
            > CREATE SOURCE sink_large_transaction_source
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-large-transaction-sink-${testdrive.seed}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            > CREATE MATERIALIZED VIEW sink_large_transaction_view2
              AS
              SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
              FROM (
                SELECT (before).f1, (before).f2 FROM sink_large_transaction_source
              )

            > CREATE MATERIALIZED VIEW sink_large_transaction_view3
              AS
              SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
              FROM (
                SELECT (after).f1, (after).f2 FROM sink_large_transaction_source
              )

            > CREATE MATERIALIZED VIEW sink_large_transaction_view4
              AS
              SELECT LEFT(f2, 1), SUM(c)
              FROM (
                SELECT (after).f2, COUNT(*) AS c FROM sink_large_transaction_source GROUP BY (after).f2
                UNION ALL
                SELECT (before).f2, -COUNT(*) AS c  FROM sink_large_transaction_source GROUP BY (before).f2
              )
              GROUP BY f2

            > SELECT * FROM sink_large_transaction_view2
            500000 200000 100000 0 99999

            > SELECT * FROM sink_large_transaction_view3
            500000 300000 100000 0 99999

            > SELECT * FROM sink_large_transaction_view4
            <null> -100000
            x 0
            y 0
            z 100000

            > DROP SOURCE sink_large_transaction_source CASCADE;
        """
        )
    )
class SinkUpsert (base_version: MzVersion, rng: random.Random | None)

Basic Check on sinks from an upsert source

Expand source code Browse git
@externally_idempotent(False)
class SinkUpsert(Check):
    """Basic Check on sinks from an upsert source"""

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                $ kafka-create-topic topic=sink-source

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "U3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "D3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

                > CREATE SOURCE sink_source
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE UPSERT

                > CREATE MATERIALIZED VIEW sink_source_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM sink_source GROUP BY LEFT(key1, 2), LEFT(f1, 1);

                > CREATE SINK sink_sink1 FROM sink_source_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
                {"key1": "U2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
                {"key1": "D2${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink2 FROM sink_source_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM
                """,
                """
                $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
                {"key1": "I3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
                {"key1": "U3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
                {"key1": "D3${kafka-ingest.iteration}"}

                > CREATE SINK sink_sink3 FROM sink_source_view
                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE DEBEZIUM
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT SELECT ON sink_source_view TO materialize
                GRANT USAGE ON CONNECTION kafka_conn TO materialize
                GRANT USAGE ON CONNECTION csr_conn TO materialize

                $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                GRANT CREATECLUSTER ON SYSTEM TO materialize

                $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
                ALTER ROLE materialize CREATECLUSTER

                > SELECT * FROM sink_source_view;
                I2 B 1000
                I3 C 1000
                U2 B 1000
                U3 C 1000

                # We check the contents of the sink topics by re-ingesting them.

                > CREATE SOURCE sink_view1
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view2
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                > CREATE SOURCE sink_view3
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE

                # Validate the sink by aggregating all the 'before' and 'after' records using SQL
                > SELECT l_v, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v, (after).l_k, (after).c FROM sink_view1
                    UNION ALL
                    SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view1
                  ) GROUP BY l_v, l_k
                  HAVING SUM(c) > 0;
                B I2 1000
                B U2 1000
                C I3 1000
                C U3 1000

                > SELECT l_v, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v, (after).l_k, (after).c FROM sink_view2
                    UNION ALL
                    SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view2
                  ) GROUP BY l_v, l_k
                  HAVING SUM(c) > 0;
                B I2 1000
                B U2 1000
                C I3 1000
                C U3 1000

                > SELECT l_v, l_k, SUM(c)
                  FROM (
                    SELECT (after).l_v, (after).l_k, (after).c FROM sink_view3
                    UNION ALL
                    SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view3
                  ) GROUP BY l_v, l_k
                  HAVING SUM(c) > 0;
                B I2 1000
                B U2 1000
                C I3 1000
                C U3 1000

                > DROP SOURCE sink_view1 CASCADE;

                > DROP SOURCE sink_view2 CASCADE;

                > DROP SOURCE sink_view3 CASCADE;
            """
            )
        )

Ancestors

Class variables

var externally_idempotent : bool

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        schemas()
        + dedent(
            """
            $ kafka-create-topic topic=sink-source

            $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "U2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

            $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "D2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

            $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "U3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

            $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "D3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

            > CREATE SOURCE sink_source
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-${testdrive.seed}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE UPSERT

            > CREATE MATERIALIZED VIEW sink_source_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM sink_source GROUP BY LEFT(key1, 2), LEFT(f1, 1);

            > CREATE SINK sink_sink1 FROM sink_source_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE DEBEZIUM
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(schemas() + dedent(s))
        for s in [
            """
            $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "I2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
            {"key1": "U2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
            {"key1": "D2${kafka-ingest.iteration}"}

            > CREATE SINK sink_sink2 FROM sink_source_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE DEBEZIUM
            """,
            """
            $[version>=5200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
            {"key1": "I3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
            {"key1": "U3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
            {"key1": "D3${kafka-ingest.iteration}"}

            > CREATE SINK sink_sink3 FROM sink_source_view
              INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE DEBEZIUM
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT SELECT ON sink_source_view TO materialize
            GRANT USAGE ON CONNECTION kafka_conn TO materialize
            GRANT USAGE ON CONNECTION csr_conn TO materialize

            $[version>=5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            GRANT CREATECLUSTER ON SYSTEM TO materialize

            $[version<5900] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
            ALTER ROLE materialize CREATECLUSTER

            > SELECT * FROM sink_source_view;
            I2 B 1000
            I3 C 1000
            U2 B 1000
            U3 C 1000

            # We check the contents of the sink topics by re-ingesting them.

            > CREATE SOURCE sink_view1
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            > CREATE SOURCE sink_view2
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            > CREATE SOURCE sink_view3
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE

            # Validate the sink by aggregating all the 'before' and 'after' records using SQL
            > SELECT l_v, l_k, SUM(c)
              FROM (
                SELECT (after).l_v, (after).l_k, (after).c FROM sink_view1
                UNION ALL
                SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view1
              ) GROUP BY l_v, l_k
              HAVING SUM(c) > 0;
            B I2 1000
            B U2 1000
            C I3 1000
            C U3 1000

            > SELECT l_v, l_k, SUM(c)
              FROM (
                SELECT (after).l_v, (after).l_k, (after).c FROM sink_view2
                UNION ALL
                SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view2
              ) GROUP BY l_v, l_k
              HAVING SUM(c) > 0;
            B I2 1000
            B U2 1000
            C I3 1000
            C U3 1000

            > SELECT l_v, l_k, SUM(c)
              FROM (
                SELECT (after).l_v, (after).l_k, (after).c FROM sink_view3
                UNION ALL
                SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view3
              ) GROUP BY l_v, l_k
              HAVING SUM(c) > 0;
            B I2 1000
            B U2 1000
            C I3 1000
            C U3 1000

            > DROP SOURCE sink_view1 CASCADE;

            > DROP SOURCE sink_view2 CASCADE;

            > DROP SOURCE sink_view3 CASCADE;
        """
        )
    )