Module materialize.checks.all_checks.error

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent


class ParseError(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE TABLE parse_error_table (f1 STRING);
                > CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table;
                > INSERT INTO parse_error_table VALUES ('123');
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(s)
            for s in [
                "> INSERT INTO parse_error_table VALUES ('abc'), ('234');",
                "> INSERT INTO parse_error_table VALUES ('345'), ('klm');",
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                ! SELECT * FROM parse_error_view;
                contains: invalid input syntax for type integer
                """
            )
        )


class ParseHexError(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE TABLE parse_hex_error_table (f1 STRING);
                > CREATE MATERIALIZED VIEW parse_hex_error_view AS SELECT decode(f1, 'hex') FROM parse_hex_error_table;
                > INSERT INTO parse_hex_error_table VALUES ('aa');
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(s)
            for s in [
                "> INSERT INTO parse_hex_error_table VALUES ('bb'), ('xx');",
                "> INSERT INTO parse_hex_error_table VALUES ('yy'), ('cc');",
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                ! SELECT * FROM parse_hex_error_view;
                contains: invalid hexadecimal digit
                """
            )
        )


class DataflowErrorRetraction(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE TABLE dataflow_error_retraction_table (f1 STRING);
                > CREATE MATERIALIZED VIEW dataflow_error_retraction_view AS SELECT f1::INTEGER FROM dataflow_error_retraction_table;
                > INSERT INTO dataflow_error_retraction_table VALUES ('123');
                > INSERT INTO dataflow_error_retraction_table VALUES ('abc');
                > INSERT INTO dataflow_error_retraction_table VALUES ('klm');
                > INSERT INTO dataflow_error_retraction_table VALUES ('234');
                ! SELECT * FROM dataflow_error_retraction_view;
                contains: invalid input syntax for type integer
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(s)
            for s in [
                dedent(
                    """
                > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'abc'
                """
                ),
                dedent(
                    """
                > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'klm'
                """
                ),
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT * FROM dataflow_error_retraction_view;
                123
                234
                """
            )
        )


def schemas() -> str:
    return dedent(
        """
       $ set schema-f1={
           "type" : "record",
           "name" : "test",
           "fields" : [
               {"name":"f1", "type":"string"}
           ]
         }

       $ set schema-f2={
           "type" : "record",
           "name" : "test",
           "fields" : [
               {"name":"f2", "type":"int"}
           ]
         }
       """
    )


@externally_idempotent(False)
class DecodeError(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                $ kafka-create-topic topic=decode-error

                $ kafka-ingest format=avro topic=decode-error schema=${schema-f1} repeat=1
                {"f1": "A"}

                > CREATE SOURCE decode_error
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                # {"f2": 123456789}, no publish
                $ kafka-ingest format=bytes topic=decode-error repeat=1
                \\x00\x00\x00\x00\x01\xaa\xb4\xde\x75
                """,
                """
                $ kafka-ingest format=bytes topic=decode-error repeat=1
                ABCD
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                ! SELECT * FROM decode_error
                contains: Decode error
                """
            )
        )


class DecodeErrorUpsertValue(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ kafka-create-topic topic=decode-error-upsert-value

                $ set schema={
                  "type" : "record",
                  "name" : "test",
                  "fields" : [
                    {"name":"f1", "type":"int"}
                    ]
                  }

                $ kafka-ingest format=avro topic=decode-error-upsert-value key-format=bytes key-terminator=: schema=${schema}
                key0: {"f1": 1}
                key1: {"f1": 2}
                key2: {"f1": 3}

                > CREATE CLUSTER decode_error_upsert_value_cluster SIZE '1';

                > CREATE SOURCE decode_error_upsert_value
                  IN CLUSTER decode_error_upsert_value_cluster
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-value-${testdrive.seed}')
                  KEY FORMAT TEXT
                  VALUE FORMAT AVRO USING SCHEMA '${schema}'
                  ENVELOPE UPSERT

                $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
                key1: garbage

                ! SELECT * FROM decode_error_upsert_value
                contains: avro deserialization error
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(
                dedent(
                    """
                    $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
                    key2: garbage2

                    ! SELECT * FROM decode_error_upsert_value
                    contains: avro deserialization error
                    """
                )
            ),
            Testdrive(
                dedent(
                    """
                    # Ingest valid avro, but with an incompatible schema
                    $ set schema-string={
                       "type" : "record",
                       "name" : "test",
                       "fields" : [
                        {"name":"f1", "type":"string"}
                       ]
                      }

                    $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=avro schema=${schema-string} confluent-wire-format=false
                    key3: {"f1": "garbage3"}

                    ! SELECT * FROM decode_error_upsert_value
                    contains: avro deserialization error
                    """,
                )
            ),
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                # Retract all the garbage and confirm the source is now operational

                $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
                key1:
                key2:
                key3:

                > SELECT f1 FROM decode_error_upsert_value
                1
                """
            )
        )


class DecodeErrorUpsertKey(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ kafka-create-topic topic=decode-error-upsert-key

                $ set key-schema={
                  "type" : "record",
                  "name" : "test",
                  "fields" : [
                    {"name":"f1", "type":"int"}
                    ]
                  }

                $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema}
                {"f1": 1} value1
                {"f1": 2} value2

                > CREATE SOURCE decode_error_upsert_key
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-key-${testdrive.seed}')
                  KEY FORMAT AVRO USING SCHEMA '${key-schema}'
                  VALUE FORMAT BYTES
                  ENVELOPE UPSERT

                $ kafka-ingest topic=decode-error-upsert-key key-format=bytes key-terminator=: format=bytes
                garbage1: value3

                ! SELECT * FROM decode_error_upsert_key
                contains: avro deserialization error
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(
                dedent(
                    """
                    # Retract existing garbage
                    $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
                    garbage1:

                    # And introduce a new one -- valid avro, but with an incompatible schema
                    $ set key-schema-string={
                       "type" : "record",
                       "name" : "test",
                       "fields" : [
                        {"name":"f1", "type":"string"}
                       ]
                      }

                    $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
                    {"f1": "garbage2"} value4

                    ! SELECT * FROM decode_error_upsert_key
                    contains: avro deserialization error
                    """
                )
            ),
            Testdrive(
                dedent(
                    """
                    # Retract existing garbage and introduce a new one
                    $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
                    garbage3: value4

                    $ set key-schema-string={
                       "type" : "record",
                       "name" : "test",
                       "fields" : [
                        {"name":"f1", "type":"string"}
                       ]
                      }

                    $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
                    {"f1": "garbage2"}

                    ! SELECT * FROM decode_error_upsert_key
                    contains: avro deserialization error
                    """,
                )
            ),
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                # Retract any remaining garbage
                $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
                garbage3:

                # Source should return to operational status
                > SELECT f1 FROM decode_error_upsert_key
                1
                2
                """
            )
        )

Functions

def schemas() ‑> str
Expand source code Browse git
def schemas() -> str:
    return dedent(
        """
       $ set schema-f1={
           "type" : "record",
           "name" : "test",
           "fields" : [
               {"name":"f1", "type":"string"}
           ]
         }

       $ set schema-f2={
           "type" : "record",
           "name" : "test",
           "fields" : [
               {"name":"f2", "type":"int"}
           ]
         }
       """
    )

Classes

class DataflowErrorRetraction (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
class DataflowErrorRetraction(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE TABLE dataflow_error_retraction_table (f1 STRING);
                > CREATE MATERIALIZED VIEW dataflow_error_retraction_view AS SELECT f1::INTEGER FROM dataflow_error_retraction_table;
                > INSERT INTO dataflow_error_retraction_table VALUES ('123');
                > INSERT INTO dataflow_error_retraction_table VALUES ('abc');
                > INSERT INTO dataflow_error_retraction_table VALUES ('klm');
                > INSERT INTO dataflow_error_retraction_table VALUES ('234');
                ! SELECT * FROM dataflow_error_retraction_view;
                contains: invalid input syntax for type integer
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(s)
            for s in [
                dedent(
                    """
                > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'abc'
                """
                ),
                dedent(
                    """
                > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'klm'
                """
                ),
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT * FROM dataflow_error_retraction_view;
                123
                234
                """
            )
        )

Ancestors

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > CREATE TABLE dataflow_error_retraction_table (f1 STRING);
            > CREATE MATERIALIZED VIEW dataflow_error_retraction_view AS SELECT f1::INTEGER FROM dataflow_error_retraction_table;
            > INSERT INTO dataflow_error_retraction_table VALUES ('123');
            > INSERT INTO dataflow_error_retraction_table VALUES ('abc');
            > INSERT INTO dataflow_error_retraction_table VALUES ('klm');
            > INSERT INTO dataflow_error_retraction_table VALUES ('234');
            ! SELECT * FROM dataflow_error_retraction_view;
            contains: invalid input syntax for type integer
        """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(s)
        for s in [
            dedent(
                """
            > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'abc'
            """
            ),
            dedent(
                """
            > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'klm'
            """
            ),
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > SELECT * FROM dataflow_error_retraction_view;
            123
            234
            """
        )
    )
class DecodeError (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
@externally_idempotent(False)
class DecodeError(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + dedent(
                """
                $ kafka-create-topic topic=decode-error

                $ kafka-ingest format=avro topic=decode-error schema=${schema-f1} repeat=1
                {"f1": "A"}

                > CREATE SOURCE decode_error
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-${testdrive.seed}')
                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
                  ENVELOPE NONE
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                # {"f2": 123456789}, no publish
                $ kafka-ingest format=bytes topic=decode-error repeat=1
                \\x00\x00\x00\x00\x01\xaa\xb4\xde\x75
                """,
                """
                $ kafka-ingest format=bytes topic=decode-error repeat=1
                ABCD
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                ! SELECT * FROM decode_error
                contains: Decode error
                """
            )
        )

Ancestors

Class variables

var externally_idempotent : bool

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        schemas()
        + dedent(
            """
            $ kafka-create-topic topic=decode-error

            $ kafka-ingest format=avro topic=decode-error schema=${schema-f1} repeat=1
            {"f1": "A"}

            > CREATE SOURCE decode_error
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-${testdrive.seed}')
              FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
              ENVELOPE NONE
        """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(schemas() + dedent(s))
        for s in [
            """
            # {"f2": 123456789}, no publish
            $ kafka-ingest format=bytes topic=decode-error repeat=1
            \\x00\x00\x00\x00\x01\xaa\xb4\xde\x75
            """,
            """
            $ kafka-ingest format=bytes topic=decode-error repeat=1
            ABCD
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            ! SELECT * FROM decode_error
            contains: Decode error
            """
        )
    )
class DecodeErrorUpsertKey (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
class DecodeErrorUpsertKey(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ kafka-create-topic topic=decode-error-upsert-key

                $ set key-schema={
                  "type" : "record",
                  "name" : "test",
                  "fields" : [
                    {"name":"f1", "type":"int"}
                    ]
                  }

                $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema}
                {"f1": 1} value1
                {"f1": 2} value2

                > CREATE SOURCE decode_error_upsert_key
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-key-${testdrive.seed}')
                  KEY FORMAT AVRO USING SCHEMA '${key-schema}'
                  VALUE FORMAT BYTES
                  ENVELOPE UPSERT

                $ kafka-ingest topic=decode-error-upsert-key key-format=bytes key-terminator=: format=bytes
                garbage1: value3

                ! SELECT * FROM decode_error_upsert_key
                contains: avro deserialization error
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(
                dedent(
                    """
                    # Retract existing garbage
                    $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
                    garbage1:

                    # And introduce a new one -- valid avro, but with an incompatible schema
                    $ set key-schema-string={
                       "type" : "record",
                       "name" : "test",
                       "fields" : [
                        {"name":"f1", "type":"string"}
                       ]
                      }

                    $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
                    {"f1": "garbage2"} value4

                    ! SELECT * FROM decode_error_upsert_key
                    contains: avro deserialization error
                    """
                )
            ),
            Testdrive(
                dedent(
                    """
                    # Retract existing garbage and introduce a new one
                    $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
                    garbage3: value4

                    $ set key-schema-string={
                       "type" : "record",
                       "name" : "test",
                       "fields" : [
                        {"name":"f1", "type":"string"}
                       ]
                      }

                    $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
                    {"f1": "garbage2"}

                    ! SELECT * FROM decode_error_upsert_key
                    contains: avro deserialization error
                    """,
                )
            ),
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                # Retract any remaining garbage
                $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
                garbage3:

                # Source should return to operational status
                > SELECT f1 FROM decode_error_upsert_key
                1
                2
                """
            )
        )

Ancestors

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            $ kafka-create-topic topic=decode-error-upsert-key

            $ set key-schema={
              "type" : "record",
              "name" : "test",
              "fields" : [
                {"name":"f1", "type":"int"}
                ]
              }

            $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema}
            {"f1": 1} value1
            {"f1": 2} value2

            > CREATE SOURCE decode_error_upsert_key
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-key-${testdrive.seed}')
              KEY FORMAT AVRO USING SCHEMA '${key-schema}'
              VALUE FORMAT BYTES
              ENVELOPE UPSERT

            $ kafka-ingest topic=decode-error-upsert-key key-format=bytes key-terminator=: format=bytes
            garbage1: value3

            ! SELECT * FROM decode_error_upsert_key
            contains: avro deserialization error
        """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(
            dedent(
                """
                # Retract existing garbage
                $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
                garbage1:

                # And introduce a new one -- valid avro, but with an incompatible schema
                $ set key-schema-string={
                   "type" : "record",
                   "name" : "test",
                   "fields" : [
                    {"name":"f1", "type":"string"}
                   ]
                  }

                $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
                {"f1": "garbage2"} value4

                ! SELECT * FROM decode_error_upsert_key
                contains: avro deserialization error
                """
            )
        ),
        Testdrive(
            dedent(
                """
                # Retract existing garbage and introduce a new one
                $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
                garbage3: value4

                $ set key-schema-string={
                   "type" : "record",
                   "name" : "test",
                   "fields" : [
                    {"name":"f1", "type":"string"}
                   ]
                  }

                $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
                {"f1": "garbage2"}

                ! SELECT * FROM decode_error_upsert_key
                contains: avro deserialization error
                """,
            )
        ),
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            # Retract any remaining garbage
            $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
            garbage3:

            # Source should return to operational status
            > SELECT f1 FROM decode_error_upsert_key
            1
            2
            """
        )
    )
class DecodeErrorUpsertValue (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
class DecodeErrorUpsertValue(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ kafka-create-topic topic=decode-error-upsert-value

                $ set schema={
                  "type" : "record",
                  "name" : "test",
                  "fields" : [
                    {"name":"f1", "type":"int"}
                    ]
                  }

                $ kafka-ingest format=avro topic=decode-error-upsert-value key-format=bytes key-terminator=: schema=${schema}
                key0: {"f1": 1}
                key1: {"f1": 2}
                key2: {"f1": 3}

                > CREATE CLUSTER decode_error_upsert_value_cluster SIZE '1';

                > CREATE SOURCE decode_error_upsert_value
                  IN CLUSTER decode_error_upsert_value_cluster
                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-value-${testdrive.seed}')
                  KEY FORMAT TEXT
                  VALUE FORMAT AVRO USING SCHEMA '${schema}'
                  ENVELOPE UPSERT

                $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
                key1: garbage

                ! SELECT * FROM decode_error_upsert_value
                contains: avro deserialization error
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(
                dedent(
                    """
                    $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
                    key2: garbage2

                    ! SELECT * FROM decode_error_upsert_value
                    contains: avro deserialization error
                    """
                )
            ),
            Testdrive(
                dedent(
                    """
                    # Ingest valid avro, but with an incompatible schema
                    $ set schema-string={
                       "type" : "record",
                       "name" : "test",
                       "fields" : [
                        {"name":"f1", "type":"string"}
                       ]
                      }

                    $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=avro schema=${schema-string} confluent-wire-format=false
                    key3: {"f1": "garbage3"}

                    ! SELECT * FROM decode_error_upsert_value
                    contains: avro deserialization error
                    """,
                )
            ),
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                # Retract all the garbage and confirm the source is now operational

                $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
                key1:
                key2:
                key3:

                > SELECT f1 FROM decode_error_upsert_value
                1
                """
            )
        )

Ancestors

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            $ kafka-create-topic topic=decode-error-upsert-value

            $ set schema={
              "type" : "record",
              "name" : "test",
              "fields" : [
                {"name":"f1", "type":"int"}
                ]
              }

            $ kafka-ingest format=avro topic=decode-error-upsert-value key-format=bytes key-terminator=: schema=${schema}
            key0: {"f1": 1}
            key1: {"f1": 2}
            key2: {"f1": 3}

            > CREATE CLUSTER decode_error_upsert_value_cluster SIZE '1';

            > CREATE SOURCE decode_error_upsert_value
              IN CLUSTER decode_error_upsert_value_cluster
              FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-value-${testdrive.seed}')
              KEY FORMAT TEXT
              VALUE FORMAT AVRO USING SCHEMA '${schema}'
              ENVELOPE UPSERT

            $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
            key1: garbage

            ! SELECT * FROM decode_error_upsert_value
            contains: avro deserialization error
        """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(
            dedent(
                """
                $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
                key2: garbage2

                ! SELECT * FROM decode_error_upsert_value
                contains: avro deserialization error
                """
            )
        ),
        Testdrive(
            dedent(
                """
                # Ingest valid avro, but with an incompatible schema
                $ set schema-string={
                   "type" : "record",
                   "name" : "test",
                   "fields" : [
                    {"name":"f1", "type":"string"}
                   ]
                  }

                $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=avro schema=${schema-string} confluent-wire-format=false
                key3: {"f1": "garbage3"}

                ! SELECT * FROM decode_error_upsert_value
                contains: avro deserialization error
                """,
            )
        ),
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            # Retract all the garbage and confirm the source is now operational

            $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
            key1:
            key2:
            key3:

            > SELECT f1 FROM decode_error_upsert_value
            1
            """
        )
    )
class ParseError (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
class ParseError(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE TABLE parse_error_table (f1 STRING);
                > CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table;
                > INSERT INTO parse_error_table VALUES ('123');
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(s)
            for s in [
                "> INSERT INTO parse_error_table VALUES ('abc'), ('234');",
                "> INSERT INTO parse_error_table VALUES ('345'), ('klm');",
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                ! SELECT * FROM parse_error_view;
                contains: invalid input syntax for type integer
                """
            )
        )

Ancestors

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > CREATE TABLE parse_error_table (f1 STRING);
            > CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table;
            > INSERT INTO parse_error_table VALUES ('123');
        """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(s)
        for s in [
            "> INSERT INTO parse_error_table VALUES ('abc'), ('234');",
            "> INSERT INTO parse_error_table VALUES ('345'), ('klm');",
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            ! SELECT * FROM parse_error_view;
            contains: invalid input syntax for type integer
            """
        )
    )
class ParseHexError (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
class ParseHexError(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE TABLE parse_hex_error_table (f1 STRING);
                > CREATE MATERIALIZED VIEW parse_hex_error_view AS SELECT decode(f1, 'hex') FROM parse_hex_error_table;
                > INSERT INTO parse_hex_error_table VALUES ('aa');
            """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(s)
            for s in [
                "> INSERT INTO parse_hex_error_table VALUES ('bb'), ('xx');",
                "> INSERT INTO parse_hex_error_table VALUES ('yy'), ('cc');",
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                ! SELECT * FROM parse_hex_error_view;
                contains: invalid hexadecimal digit
                """
            )
        )

Ancestors

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > CREATE TABLE parse_hex_error_table (f1 STRING);
            > CREATE MATERIALIZED VIEW parse_hex_error_view AS SELECT decode(f1, 'hex') FROM parse_hex_error_table;
            > INSERT INTO parse_hex_error_table VALUES ('aa');
        """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(s)
        for s in [
            "> INSERT INTO parse_hex_error_table VALUES ('bb'), ('xx');",
            "> INSERT INTO parse_hex_error_table VALUES ('yy'), ('cc');",
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            ! SELECT * FROM parse_hex_error_view;
            contains: invalid hexadecimal digit
            """
        )
    )