Module materialize.checks.all_checks.upsert_wide
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
PAD_100K = "X" * (100 * 1024)
PAD_500K = "Y" * (500 * 1024)
PAD_1M = "Z" * (1000 * 1024)
@externally_idempotent(False)
class UpsertWideValue(Check):
"""Perform upsert over records with a very long/wide value."""
def initialize(self) -> Testdrive:
return Testdrive(
dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
+ dedent(
f"""
$ kafka-create-topic topic=upsert-wide-value
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "A"}} {{"f1": "{PAD_100K}"}}
> CREATE SOURCE upsert_wide_value
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-wide-value-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT
> CREATE MATERIALIZED VIEW upsert_wide_value_view AS
SELECT LEFT(f1, 1), RIGHT(f1, 1),
LENGTH(f1)
FROM upsert_wide_value
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent(s))
for s in [
f"""
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "A"}} {{"f1": "{PAD_1M}"}}
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "B"}} {{"f1": "{PAD_500K}"}}
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "C"}} {{"f1": "{PAD_1M}"}}
""",
f"""
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "A"}} {{"f1": "{PAD_500K}"}}
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "B"}} {{"f1": "{PAD_1M}"}}
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "C"}} {{"f1": "{PAD_100K}"}}
""",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT * FROM upsert_wide_value_view
X X 102400
Y Y 512000
Z Z 1024000
"""
)
)
@externally_idempotent(False)
class UpsertWideKey(Check):
"""Perform upsert over records with a very long/wide key."""
def initialize(self) -> Testdrive:
return Testdrive(
dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
+ dedent(
f"""
$ kafka-create-topic topic=upsert-wide-key
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "A{PAD_1M}"}} {{"f1": "A1"}}
{{"key1": "B{PAD_1M}"}} {{"f1": "B1"}}
{{"key1": "C{PAD_1M}"}} {{"f1": "C1"}}
> CREATE SOURCE upsert_wide_key
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-wide-key-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT
> CREATE MATERIALIZED VIEW upsert_wide_key_view AS
SELECT LEFT(key1, 1), RIGHT(key1, 1),
LENGTH(key1), f1
FROM upsert_wide_key
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent(s))
for s in [
f"""
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "A{PAD_1M}"}} {{"f1": "A2"}}
{{"key1": "D{PAD_1M}"}} {{"f1": "D1"}}
# Delete B ...
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "B{PAD_1M}"}}
""",
f"""
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "A{PAD_1M}"}} {{"f1": "A3"}}
{{"key1": "E{PAD_1M}"}} {{"f1": "E1"}}
# Delete C ...
$ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
{{"key1": "C{PAD_1M}"}}
""",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT * FROM upsert_wide_key_view
A Z 1024001 A3
D Z 1024001 D1
E Z 1024001 E1
"""
)
)
Classes
class UpsertWideKey (base_version: MzVersion, rng: random.Random | None)
-
Perform upsert over records with a very long/wide key.
Expand source code Browse git
@externally_idempotent(False) class UpsertWideKey(Check): """Perform upsert over records with a very long/wide key.""" def initialize(self) -> Testdrive: return Testdrive( dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent( f""" $ kafka-create-topic topic=upsert-wide-key $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A{PAD_1M}"}} {{"f1": "A1"}} {{"key1": "B{PAD_1M}"}} {{"f1": "B1"}} {{"key1": "C{PAD_1M}"}} {{"f1": "C1"}} > CREATE SOURCE upsert_wide_key FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-wide-key-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT > CREATE MATERIALIZED VIEW upsert_wide_key_view AS SELECT LEFT(key1, 1), RIGHT(key1, 1), LENGTH(key1), f1 FROM upsert_wide_key """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent(s)) for s in [ f""" $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A{PAD_1M}"}} {{"f1": "A2"}} {{"key1": "D{PAD_1M}"}} {{"f1": "D1"}} # Delete B ... $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "B{PAD_1M}"}} """, f""" $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A{PAD_1M}"}} {{"f1": "A3"}} {{"key1": "E{PAD_1M}"}} {{"f1": "E1"}} # Delete C ... $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "C{PAD_1M}"}} """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM upsert_wide_key_view A Z 1024001 A3 D Z 1024001 D1 E Z 1024001 E1 """ ) )
Ancestors
Class variables
var externally_idempotent : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent( f""" $ kafka-create-topic topic=upsert-wide-key $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A{PAD_1M}"}} {{"f1": "A1"}} {{"key1": "B{PAD_1M}"}} {{"f1": "B1"}} {{"key1": "C{PAD_1M}"}} {{"f1": "C1"}} > CREATE SOURCE upsert_wide_key FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-wide-key-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT > CREATE MATERIALIZED VIEW upsert_wide_key_view AS SELECT LEFT(key1, 1), RIGHT(key1, 1), LENGTH(key1), f1 FROM upsert_wide_key """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent(s)) for s in [ f""" $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A{PAD_1M}"}} {{"f1": "A2"}} {{"key1": "D{PAD_1M}"}} {{"f1": "D1"}} # Delete B ... $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "B{PAD_1M}"}} """, f""" $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A{PAD_1M}"}} {{"f1": "A3"}} {{"key1": "E{PAD_1M}"}} {{"f1": "E1"}} # Delete C ... $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}} {{"key1": "C{PAD_1M}"}} """, ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM upsert_wide_key_view A Z 1024001 A3 D Z 1024001 D1 E Z 1024001 E1 """ ) )
class UpsertWideValue (base_version: MzVersion, rng: random.Random | None)
-
Perform upsert over records with a very long/wide value.
Expand source code Browse git
@externally_idempotent(False) class UpsertWideValue(Check): """Perform upsert over records with a very long/wide value.""" def initialize(self) -> Testdrive: return Testdrive( dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent( f""" $ kafka-create-topic topic=upsert-wide-value $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A"}} {{"f1": "{PAD_100K}"}} > CREATE SOURCE upsert_wide_value FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-wide-value-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT > CREATE MATERIALIZED VIEW upsert_wide_value_view AS SELECT LEFT(f1, 1), RIGHT(f1, 1), LENGTH(f1) FROM upsert_wide_value """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent(s)) for s in [ f""" $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A"}} {{"f1": "{PAD_1M}"}} $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "B"}} {{"f1": "{PAD_500K}"}} $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "C"}} {{"f1": "{PAD_1M}"}} """, f""" $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A"}} {{"f1": "{PAD_500K}"}} $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "B"}} {{"f1": "{PAD_1M}"}} $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "C"}} {{"f1": "{PAD_100K}"}} """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM upsert_wide_value_view X X 102400 Y Y 512000 Z Z 1024000 """ ) )
Ancestors
Class variables
var externally_idempotent : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent( f""" $ kafka-create-topic topic=upsert-wide-value $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A"}} {{"f1": "{PAD_100K}"}} > CREATE SOURCE upsert_wide_value FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-wide-value-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT > CREATE MATERIALIZED VIEW upsert_wide_value_view AS SELECT LEFT(f1, 1), RIGHT(f1, 1), LENGTH(f1) FROM upsert_wide_value """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent(s)) for s in [ f""" $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A"}} {{"f1": "{PAD_1M}"}} $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "B"}} {{"f1": "{PAD_500K}"}} $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "C"}} {{"f1": "{PAD_1M}"}} """, f""" $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "A"}} {{"f1": "{PAD_500K}"}} $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "B"}} {{"f1": "{PAD_1M}"}} $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}} {{"key1": "C"}} {{"f1": "{PAD_100K}"}} """, ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM upsert_wide_value_view X X 102400 Y Y 512000 Z Z 1024000 """ ) )