Module materialize.checks.all_checks.rename_source
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
@externally_idempotent(False)
class RenameSource(Check):
def _source_schema(self) -> str:
return dedent(
"""
$ set rename-source-schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"}
]
}
"""
)
def initialize(self) -> Testdrive:
return Testdrive(
self._source_schema()
+ dedent(
"""
$ kafka-create-topic topic=rename-source
$ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
{"f1": "A"}
> CREATE SOURCE rename_source1
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-rename-source-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE
$ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
{"f1": "B"}
> CREATE MATERIALIZED VIEW rename_source_view AS SELECT DISTINCT f1 FROM rename_source1;
$ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
{"f1": "C"}
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(self._source_schema() + dedent(s))
for s in [
"""
$ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
{"f1": "D"}
> ALTER SOURCE rename_source1 RENAME to rename_source2;
$ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
{"f1": "E"}
""",
"""
# When upgrading from old version without roles the source is
# owned by default_role, thus we have to change the owner
# before dropping it:
$[version>=4700] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SOURCE rename_source2 OWNER TO materialize;
$ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
{"f1": "F"}
> ALTER SOURCE rename_source2 RENAME to rename_source3;
$ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
{"f1": "G"}
""",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT * FROM rename_source3;
A
B
C
D
E
F
G
> SELECT * FROM rename_source_view;
A
B
C
D
E
F
G
"""
)
)
Classes
class RenameSource (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
@externally_idempotent(False) class RenameSource(Check): def _source_schema(self) -> str: return dedent( """ $ set rename-source-schema={ "type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] } """ ) def initialize(self) -> Testdrive: return Testdrive( self._source_schema() + dedent( """ $ kafka-create-topic topic=rename-source $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "A"} > CREATE SOURCE rename_source1 FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-rename-source-${testdrive.seed}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE NONE $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "B"} > CREATE MATERIALIZED VIEW rename_source_view AS SELECT DISTINCT f1 FROM rename_source1; $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "C"} """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(self._source_schema() + dedent(s)) for s in [ """ $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "D"} > ALTER SOURCE rename_source1 RENAME to rename_source2; $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "E"} """, """ # When upgrading from old version without roles the source is # owned by default_role, thus we have to change the owner # before dropping it: $[version>=4700] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} ALTER SOURCE rename_source2 OWNER TO materialize; $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "F"} > ALTER SOURCE rename_source2 RENAME to rename_source3; $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "G"} """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM rename_source3; A B C D E F G > SELECT * FROM rename_source_view; A B C D E F G """ ) )
Ancestors
Class variables
var externally_idempotent : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( self._source_schema() + dedent( """ $ kafka-create-topic topic=rename-source $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "A"} > CREATE SOURCE rename_source1 FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-rename-source-${testdrive.seed}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE NONE $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "B"} > CREATE MATERIALIZED VIEW rename_source_view AS SELECT DISTINCT f1 FROM rename_source1; $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "C"} """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(self._source_schema() + dedent(s)) for s in [ """ $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "D"} > ALTER SOURCE rename_source1 RENAME to rename_source2; $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "E"} """, """ # When upgrading from old version without roles the source is # owned by default_role, thus we have to change the owner # before dropping it: $[version>=4700] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} ALTER SOURCE rename_source2 OWNER TO materialize; $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "F"} > ALTER SOURCE rename_source2 RENAME to rename_source3; $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema} {"f1": "G"} """, ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM rename_source3; A B C D E F G > SELECT * FROM rename_source_view; A B C D E F G """ ) )