Module materialize.checks.all_checks.upsert_many_updates

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD

INCREMENTS = 100000


class UpsertManyUpdates(Check):
    """Update the same row over and over"""

    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
            + dedent(
                """
                $ kafka-create-topic topic=upsert-many-updates
                $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
                {"key1": "A"} {"f1": "0"}

                > CREATE SOURCE upsert_many_updates
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-many-updates-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE UPSERT

                > CREATE MATERIALIZED VIEW upsert_many_updates_view AS
                  SELECT f1 FROM upsert_many_updates
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        # Construct inputs for $kafka-ingest where every update is a separate Kafka message to be ingested
        increment1 = "\n".join(
            [f"""{{"key1": "A"}} {{"f1": "{i+1}"}}""" for i in range(INCREMENTS)]
        )
        increment2 = "\n".join(
            [
                f"""{{"key1": "A"}} {{"f1": "{INCREMENTS+i+1}"}}"""
                for i in range(INCREMENTS)
            ]
        )

        return [
            Testdrive(
                dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
                + dedent(
                    """
                    $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
                    """
                )
                + increment1
            ),
            Testdrive(
                dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
                + dedent(
                    """
                    $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
                    """
                )
                + increment2
            ),
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                f"""
                > SELECT * FROM upsert_many_updates
                A {INCREMENTS*2}
                """
            )
        )

Classes

class UpsertManyUpdates (base_version: MzVersion, rng: random.Random | None)

Update the same row over and over

Expand source code Browse git
class UpsertManyUpdates(Check):
    """Update the same row over and over"""

    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
            + dedent(
                """
                $ kafka-create-topic topic=upsert-many-updates
                $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
                {"key1": "A"} {"f1": "0"}

                > CREATE SOURCE upsert_many_updates
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-many-updates-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE UPSERT

                > CREATE MATERIALIZED VIEW upsert_many_updates_view AS
                  SELECT f1 FROM upsert_many_updates
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        # Construct inputs for $kafka-ingest where every update is a separate Kafka message to be ingested
        increment1 = "\n".join(
            [f"""{{"key1": "A"}} {{"f1": "{i+1}"}}""" for i in range(INCREMENTS)]
        )
        increment2 = "\n".join(
            [
                f"""{{"key1": "A"}} {{"f1": "{INCREMENTS+i+1}"}}"""
                for i in range(INCREMENTS)
            ]
        )

        return [
            Testdrive(
                dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
                + dedent(
                    """
                    $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
                    """
                )
                + increment1
            ),
            Testdrive(
                dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
                + dedent(
                    """
                    $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
                    """
                )
                + increment2
            ),
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                f"""
                > SELECT * FROM upsert_many_updates
                A {INCREMENTS*2}
                """
            )
        )

Ancestors

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
        + dedent(
            """
            $ kafka-create-topic topic=upsert-many-updates
            $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
            {"key1": "A"} {"f1": "0"}

            > CREATE SOURCE upsert_many_updates
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-many-updates-${testdrive.seed}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE UPSERT

            > CREATE MATERIALIZED VIEW upsert_many_updates_view AS
              SELECT f1 FROM upsert_many_updates
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    # Construct inputs for $kafka-ingest where every update is a separate Kafka message to be ingested
    increment1 = "\n".join(
        [f"""{{"key1": "A"}} {{"f1": "{i+1}"}}""" for i in range(INCREMENTS)]
    )
    increment2 = "\n".join(
        [
            f"""{{"key1": "A"}} {{"f1": "{INCREMENTS+i+1}"}}"""
            for i in range(INCREMENTS)
        ]
    )

    return [
        Testdrive(
            dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
            + dedent(
                """
                $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
                """
            )
            + increment1
        ),
        Testdrive(
            dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
            + dedent(
                """
                $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
                """
            )
            + increment2
        ),
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            f"""
            > SELECT * FROM upsert_many_updates
            A {INCREMENTS*2}
            """
        )
    )