Module materialize.checks.all_checks.error
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
class ParseError(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
> CREATE TABLE parse_error_table (f1 STRING);
> CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table;
> INSERT INTO parse_error_table VALUES ('123');
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(s)
for s in [
"> INSERT INTO parse_error_table VALUES ('abc'), ('234');",
"> INSERT INTO parse_error_table VALUES ('345'), ('klm');",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
! SELECT * FROM parse_error_view;
contains: invalid input syntax for type integer
"""
)
)
class ParseHexError(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
> CREATE TABLE parse_hex_error_table (f1 STRING);
> CREATE MATERIALIZED VIEW parse_hex_error_view AS SELECT decode(f1, 'hex') FROM parse_hex_error_table;
> INSERT INTO parse_hex_error_table VALUES ('aa');
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(s)
for s in [
"> INSERT INTO parse_hex_error_table VALUES ('bb'), ('xx');",
"> INSERT INTO parse_hex_error_table VALUES ('yy'), ('cc');",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
! SELECT * FROM parse_hex_error_view;
contains: invalid hexadecimal digit
"""
)
)
class DataflowErrorRetraction(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
> CREATE TABLE dataflow_error_retraction_table (f1 STRING);
> CREATE MATERIALIZED VIEW dataflow_error_retraction_view AS SELECT f1::INTEGER FROM dataflow_error_retraction_table;
> INSERT INTO dataflow_error_retraction_table VALUES ('123');
> INSERT INTO dataflow_error_retraction_table VALUES ('abc');
> INSERT INTO dataflow_error_retraction_table VALUES ('klm');
> INSERT INTO dataflow_error_retraction_table VALUES ('234');
! SELECT * FROM dataflow_error_retraction_view;
contains: invalid input syntax for type integer
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(s)
for s in [
dedent(
"""
> DELETE FROM dataflow_error_retraction_table WHERE f1 = 'abc'
"""
),
dedent(
"""
> DELETE FROM dataflow_error_retraction_table WHERE f1 = 'klm'
"""
),
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT * FROM dataflow_error_retraction_view;
123
234
"""
)
)
def schemas() -> str:
return dedent(
"""
$ set schema-f1={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"}
]
}
$ set schema-f2={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f2", "type":"int"}
]
}
"""
)
@externally_idempotent(False)
class DecodeError(Check):
def initialize(self) -> Testdrive:
return Testdrive(
schemas()
+ dedent(
"""
$ kafka-create-topic topic=decode-error
$ kafka-ingest format=avro topic=decode-error schema=${schema-f1} repeat=1
{"f1": "A"}
> CREATE SOURCE decode_error
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
# {"f2": 123456789}, no publish
$ kafka-ingest format=bytes topic=decode-error repeat=1
\\x00\x00\x00\x00\x01\xaa\xb4\xde\x75
""",
"""
$ kafka-ingest format=bytes topic=decode-error repeat=1
ABCD
""",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
! SELECT * FROM decode_error
contains: Decode error
"""
)
)
class DecodeErrorUpsertValue(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
$ kafka-create-topic topic=decode-error-upsert-value
$ set schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"int"}
]
}
$ kafka-ingest format=avro topic=decode-error-upsert-value key-format=bytes key-terminator=: schema=${schema}
key0: {"f1": 1}
key1: {"f1": 2}
key2: {"f1": 3}
> CREATE CLUSTER decode_error_upsert_value_cluster SIZE '1';
> CREATE SOURCE decode_error_upsert_value
IN CLUSTER decode_error_upsert_value_cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-value-${testdrive.seed}')
KEY FORMAT TEXT
VALUE FORMAT AVRO USING SCHEMA '${schema}'
ENVELOPE UPSERT
$ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
key1: garbage
! SELECT * FROM decode_error_upsert_value
contains: avro deserialization error
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(
dedent(
"""
$ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
key2: garbage2
! SELECT * FROM decode_error_upsert_value
contains: avro deserialization error
"""
)
),
Testdrive(
dedent(
"""
# Ingest valid avro, but with an incompatible schema
$ set schema-string={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"}
]
}
$ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=avro schema=${schema-string} confluent-wire-format=false
key3: {"f1": "garbage3"}
! SELECT * FROM decode_error_upsert_value
contains: avro deserialization error
""",
)
),
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
# Retract all the garbage and confirm the source is now operational
$ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
key1:
key2:
key3:
> SELECT f1 FROM decode_error_upsert_value
1
"""
)
)
class DecodeErrorUpsertKey(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
$ kafka-create-topic topic=decode-error-upsert-key
$ set key-schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"int"}
]
}
$ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema}
{"f1": 1} value1
{"f1": 2} value2
> CREATE SOURCE decode_error_upsert_key
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-key-${testdrive.seed}')
KEY FORMAT AVRO USING SCHEMA '${key-schema}'
VALUE FORMAT BYTES
ENVELOPE UPSERT
$ kafka-ingest topic=decode-error-upsert-key key-format=bytes key-terminator=: format=bytes
garbage1: value3
! SELECT * FROM decode_error_upsert_key
contains: avro deserialization error
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(
dedent(
"""
# Retract existing garbage
$ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
garbage1:
# And introduce a new one -- valid avro, but with an incompatible schema
$ set key-schema-string={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"}
]
}
$ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
{"f1": "garbage2"} value4
! SELECT * FROM decode_error_upsert_key
contains: avro deserialization error
"""
)
),
Testdrive(
dedent(
"""
# Retract existing garbage and introduce a new one
$ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
garbage3: value4
$ set key-schema-string={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"}
]
}
$ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
{"f1": "garbage2"}
! SELECT * FROM decode_error_upsert_key
contains: avro deserialization error
""",
)
),
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
# Retract any remaining garbage
$ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
garbage3:
# Source should return to operational status
> SELECT f1 FROM decode_error_upsert_key
1
2
"""
)
)
Functions
def schemas() ‑> str
-
Expand source code Browse git
def schemas() -> str: return dedent( """ $ set schema-f1={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] } $ set schema-f2={ "type" : "record", "name" : "test", "fields" : [ {"name":"f2", "type":"int"} ] } """ )
Classes
class DataflowErrorRetraction (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
class DataflowErrorRetraction(Check): def initialize(self) -> Testdrive: return Testdrive( dedent( """ > CREATE TABLE dataflow_error_retraction_table (f1 STRING); > CREATE MATERIALIZED VIEW dataflow_error_retraction_view AS SELECT f1::INTEGER FROM dataflow_error_retraction_table; > INSERT INTO dataflow_error_retraction_table VALUES ('123'); > INSERT INTO dataflow_error_retraction_table VALUES ('abc'); > INSERT INTO dataflow_error_retraction_table VALUES ('klm'); > INSERT INTO dataflow_error_retraction_table VALUES ('234'); ! SELECT * FROM dataflow_error_retraction_view; contains: invalid input syntax for type integer """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(s) for s in [ dedent( """ > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'abc' """ ), dedent( """ > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'klm' """ ), ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM dataflow_error_retraction_view; 123 234 """ ) )
Ancestors
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( """ > CREATE TABLE dataflow_error_retraction_table (f1 STRING); > CREATE MATERIALIZED VIEW dataflow_error_retraction_view AS SELECT f1::INTEGER FROM dataflow_error_retraction_table; > INSERT INTO dataflow_error_retraction_table VALUES ('123'); > INSERT INTO dataflow_error_retraction_table VALUES ('abc'); > INSERT INTO dataflow_error_retraction_table VALUES ('klm'); > INSERT INTO dataflow_error_retraction_table VALUES ('234'); ! SELECT * FROM dataflow_error_retraction_view; contains: invalid input syntax for type integer """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(s) for s in [ dedent( """ > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'abc' """ ), dedent( """ > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'klm' """ ), ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM dataflow_error_retraction_view; 123 234 """ ) )
class DecodeError (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
@externally_idempotent(False) class DecodeError(Check): def initialize(self) -> Testdrive: return Testdrive( schemas() + dedent( """ $ kafka-create-topic topic=decode-error $ kafka-ingest format=avro topic=decode-error schema=${schema-f1} repeat=1 {"f1": "A"} > CREATE SOURCE decode_error FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-${testdrive.seed}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE NONE """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(schemas() + dedent(s)) for s in [ """ # {"f2": 123456789}, no publish $ kafka-ingest format=bytes topic=decode-error repeat=1 \\x00\x00\x00\x00\x01\xaa\xb4\xde\x75 """, """ $ kafka-ingest format=bytes topic=decode-error repeat=1 ABCD """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ ! SELECT * FROM decode_error contains: Decode error """ ) )
Ancestors
Class variables
var externally_idempotent : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( schemas() + dedent( """ $ kafka-create-topic topic=decode-error $ kafka-ingest format=avro topic=decode-error schema=${schema-f1} repeat=1 {"f1": "A"} > CREATE SOURCE decode_error FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-${testdrive.seed}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE NONE """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(schemas() + dedent(s)) for s in [ """ # {"f2": 123456789}, no publish $ kafka-ingest format=bytes topic=decode-error repeat=1 \\x00\x00\x00\x00\x01\xaa\xb4\xde\x75 """, """ $ kafka-ingest format=bytes topic=decode-error repeat=1 ABCD """, ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ ! SELECT * FROM decode_error contains: Decode error """ ) )
class DecodeErrorUpsertKey (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
class DecodeErrorUpsertKey(Check): def initialize(self) -> Testdrive: return Testdrive( dedent( """ $ kafka-create-topic topic=decode-error-upsert-key $ set key-schema={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"int"} ] } $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema} {"f1": 1} value1 {"f1": 2} value2 > CREATE SOURCE decode_error_upsert_key FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-key-${testdrive.seed}') KEY FORMAT AVRO USING SCHEMA '${key-schema}' VALUE FORMAT BYTES ENVELOPE UPSERT $ kafka-ingest topic=decode-error-upsert-key key-format=bytes key-terminator=: format=bytes garbage1: value3 ! SELECT * FROM decode_error_upsert_key contains: avro deserialization error """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive( dedent( """ # Retract existing garbage $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=: garbage1: # And introduce a new one -- valid avro, but with an incompatible schema $ set key-schema-string={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] } $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false {"f1": "garbage2"} value4 ! SELECT * FROM decode_error_upsert_key contains: avro deserialization error """ ) ), Testdrive( dedent( """ # Retract existing garbage and introduce a new one $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=: garbage3: value4 $ set key-schema-string={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] } $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false {"f1": "garbage2"} ! SELECT * FROM decode_error_upsert_key contains: avro deserialization error """, ) ), ] def validate(self) -> Testdrive: return Testdrive( dedent( """ # Retract any remaining garbage $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=: garbage3: # Source should return to operational status > SELECT f1 FROM decode_error_upsert_key 1 2 """ ) )
Ancestors
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( """ $ kafka-create-topic topic=decode-error-upsert-key $ set key-schema={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"int"} ] } $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema} {"f1": 1} value1 {"f1": 2} value2 > CREATE SOURCE decode_error_upsert_key FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-key-${testdrive.seed}') KEY FORMAT AVRO USING SCHEMA '${key-schema}' VALUE FORMAT BYTES ENVELOPE UPSERT $ kafka-ingest topic=decode-error-upsert-key key-format=bytes key-terminator=: format=bytes garbage1: value3 ! SELECT * FROM decode_error_upsert_key contains: avro deserialization error """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive( dedent( """ # Retract existing garbage $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=: garbage1: # And introduce a new one -- valid avro, but with an incompatible schema $ set key-schema-string={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] } $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false {"f1": "garbage2"} value4 ! SELECT * FROM decode_error_upsert_key contains: avro deserialization error """ ) ), Testdrive( dedent( """ # Retract existing garbage and introduce a new one $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=: garbage3: value4 $ set key-schema-string={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] } $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false {"f1": "garbage2"} ! SELECT * FROM decode_error_upsert_key contains: avro deserialization error """, ) ), ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ # Retract any remaining garbage $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=: garbage3: # Source should return to operational status > SELECT f1 FROM decode_error_upsert_key 1 2 """ ) )
class DecodeErrorUpsertValue (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
class DecodeErrorUpsertValue(Check): def initialize(self) -> Testdrive: return Testdrive( dedent( """ $ kafka-create-topic topic=decode-error-upsert-value $ set schema={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"int"} ] } $ kafka-ingest format=avro topic=decode-error-upsert-value key-format=bytes key-terminator=: schema=${schema} key0: {"f1": 1} key1: {"f1": 2} key2: {"f1": 3} > CREATE CLUSTER decode_error_upsert_value_cluster SIZE '1'; > CREATE SOURCE decode_error_upsert_value IN CLUSTER decode_error_upsert_value_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-value-${testdrive.seed}') KEY FORMAT TEXT VALUE FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE UPSERT $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes key1: garbage ! SELECT * FROM decode_error_upsert_value contains: avro deserialization error """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive( dedent( """ $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes key2: garbage2 ! SELECT * FROM decode_error_upsert_value contains: avro deserialization error """ ) ), Testdrive( dedent( """ # Ingest valid avro, but with an incompatible schema $ set schema-string={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] } $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=avro schema=${schema-string} confluent-wire-format=false key3: {"f1": "garbage3"} ! SELECT * FROM decode_error_upsert_value contains: avro deserialization error """, ) ), ] def validate(self) -> Testdrive: return Testdrive( dedent( """ # Retract all the garbage and confirm the source is now operational $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes key1: key2: key3: > SELECT f1 FROM decode_error_upsert_value 1 """ ) )
Ancestors
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( """ $ kafka-create-topic topic=decode-error-upsert-value $ set schema={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"int"} ] } $ kafka-ingest format=avro topic=decode-error-upsert-value key-format=bytes key-terminator=: schema=${schema} key0: {"f1": 1} key1: {"f1": 2} key2: {"f1": 3} > CREATE CLUSTER decode_error_upsert_value_cluster SIZE '1'; > CREATE SOURCE decode_error_upsert_value IN CLUSTER decode_error_upsert_value_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-value-${testdrive.seed}') KEY FORMAT TEXT VALUE FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE UPSERT $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes key1: garbage ! SELECT * FROM decode_error_upsert_value contains: avro deserialization error """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive( dedent( """ $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes key2: garbage2 ! SELECT * FROM decode_error_upsert_value contains: avro deserialization error """ ) ), Testdrive( dedent( """ # Ingest valid avro, but with an incompatible schema $ set schema-string={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] } $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=avro schema=${schema-string} confluent-wire-format=false key3: {"f1": "garbage3"} ! SELECT * FROM decode_error_upsert_value contains: avro deserialization error """, ) ), ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ # Retract all the garbage and confirm the source is now operational $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes key1: key2: key3: > SELECT f1 FROM decode_error_upsert_value 1 """ ) )
class ParseError (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
class ParseError(Check): def initialize(self) -> Testdrive: return Testdrive( dedent( """ > CREATE TABLE parse_error_table (f1 STRING); > CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table; > INSERT INTO parse_error_table VALUES ('123'); """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(s) for s in [ "> INSERT INTO parse_error_table VALUES ('abc'), ('234');", "> INSERT INTO parse_error_table VALUES ('345'), ('klm');", ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ ! SELECT * FROM parse_error_view; contains: invalid input syntax for type integer """ ) )
Ancestors
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( """ > CREATE TABLE parse_error_table (f1 STRING); > CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table; > INSERT INTO parse_error_table VALUES ('123'); """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(s) for s in [ "> INSERT INTO parse_error_table VALUES ('abc'), ('234');", "> INSERT INTO parse_error_table VALUES ('345'), ('klm');", ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ ! SELECT * FROM parse_error_view; contains: invalid input syntax for type integer """ ) )
class ParseHexError (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
class ParseHexError(Check): def initialize(self) -> Testdrive: return Testdrive( dedent( """ > CREATE TABLE parse_hex_error_table (f1 STRING); > CREATE MATERIALIZED VIEW parse_hex_error_view AS SELECT decode(f1, 'hex') FROM parse_hex_error_table; > INSERT INTO parse_hex_error_table VALUES ('aa'); """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(s) for s in [ "> INSERT INTO parse_hex_error_table VALUES ('bb'), ('xx');", "> INSERT INTO parse_hex_error_table VALUES ('yy'), ('cc');", ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ ! SELECT * FROM parse_hex_error_view; contains: invalid hexadecimal digit """ ) )
Ancestors
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( """ > CREATE TABLE parse_hex_error_table (f1 STRING); > CREATE MATERIALIZED VIEW parse_hex_error_view AS SELECT decode(f1, 'hex') FROM parse_hex_error_table; > INSERT INTO parse_hex_error_table VALUES ('aa'); """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(s) for s in [ "> INSERT INTO parse_hex_error_table VALUES ('bb'), ('xx');", "> INSERT INTO parse_hex_error_table VALUES ('yy'), ('cc');", ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ ! SELECT * FROM parse_hex_error_view; contains: invalid hexadecimal digit """ ) )