misc.python.materialize.checks.mzcompose_actions

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10import json
 11from textwrap import dedent
 12from typing import TYPE_CHECKING, Any
 13
 14from materialize import MZ_ROOT
 15from materialize.checks.actions import Action
 16from materialize.checks.executors import Executor
 17from materialize.docker import image_registry
 18from materialize.mz_version import MzVersion
 19from materialize.mzcompose.services.clusterd import Clusterd
 20from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
 21from materialize.mzcompose.services.ssh_bastion_host import (
 22    setup_default_ssh_test_connection,
 23)
 24
 25if TYPE_CHECKING:
 26    from materialize.checks.scenarios import Scenario
 27
 28
 29class MzcomposeAction(Action):
 30    def join(self, e: Executor) -> None:
 31        # Most of these actions are already blocking
 32        pass
 33
 34
 35class StartMz(MzcomposeAction):
 36    def __init__(
 37        self,
 38        scenario: "Scenario",
 39        tag: MzVersion | None = None,
 40        environment_extra: list[str] = [],
 41        system_parameter_defaults: dict[str, str] | None = None,
 42        additional_system_parameter_defaults: dict[str, str] = {},
 43        system_parameter_version: MzVersion | None = None,
 44        mz_service: str | None = None,
 45        platform: str | None = None,
 46        healthcheck: list[str] | None = None,
 47        deploy_generation: int | None = None,
 48        restart: str | None = None,
 49        force_migrations: str | None = None,
 50        publish: bool | None = None,
 51    ) -> None:
 52        if healthcheck is None:
 53            healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]
 54        self.tag = tag
 55        self.environment_extra = environment_extra
 56        self.system_parameter_defaults = system_parameter_defaults
 57        self.additional_system_parameter_defaults = additional_system_parameter_defaults
 58        self.system_parameter_version = system_parameter_version or tag
 59        self.healthcheck = healthcheck
 60        self.mz_service = mz_service
 61        self.platform = platform
 62        self.deploy_generation = deploy_generation
 63        self.restart = restart
 64        self.force_migrations = force_migrations
 65        self.publish = publish
 66        self.scenario = scenario
 67
 68    def execute(self, e: Executor) -> None:
 69        c = e.mzcompose_composition()
 70
 71        image = (
 72            f"{image_registry()}/materialized:{self.tag}"
 73            if self.tag is not None
 74            else None
 75        )
 76        print(f"Starting Mz using image {image}, mz_service {self.mz_service}")
 77
 78        listeners_config_path = (
 79            f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive.json"
 80        )
 81
 82        if not self.tag or self.tag >= MzVersion.parse_mz("v0.158.0-dev"):
 83            listeners_config_path = (
 84                f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive_sasl.json"
 85            )
 86
 87        mz = Materialized(
 88            name=self.mz_service,
 89            image=image,
 90            external_metadata_store=True,
 91            external_blob_store=True,
 92            blob_store_is_azure=self.scenario.features.azurite_enabled(),
 93            environment_extra=self.environment_extra,
 94            system_parameter_defaults=self.system_parameter_defaults,
 95            additional_system_parameter_defaults=self.additional_system_parameter_defaults,
 96            system_parameter_version=self.system_parameter_version,
 97            sanity_restart=False,
 98            platform=self.platform,
 99            healthcheck=self.healthcheck,
100            deploy_generation=self.deploy_generation,
101            restart=self.restart,
102            force_migrations=self.force_migrations,
103            publish=self.publish,
104            default_replication_factor=2,
105            support_external_clusterd=True,
106            listeners_config_path=listeners_config_path,
107        )
108
109        # Don't fail since we are careful to explicitly kill and collect logs
110        # of the services thus started
111        with c.override(mz, fail_on_new_service=False):
112            c.up("materialized" if self.mz_service is None else self.mz_service)
113
114            # If we start up Materialize with a deploy-generation , then it
115            # stays in a stuck state when the preflight-check is completed. So
116            # we can't connect to it yet to run any commands.
117            if self.deploy_generation:
118                return
119
120            # This should live in ssh.py and alter_connection.py, but accessing the
121            # ssh bastion host from inside a check is not possible currently.
122            for i in range(4):
123                ssh_tunnel_name = f"ssh_tunnel_{i}"
124                setup_default_ssh_test_connection(
125                    c, ssh_tunnel_name, mz_service=self.mz_service
126                )
127
128            mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
129            if self.tag:
130                assert (
131                    self.tag == mz_version
132                ), f"Materialize version mismatch, expected {self.tag}, but got {mz_version}"
133            else:
134                version_cargo = MzVersion.parse_cargo()
135                assert (
136                    version_cargo == mz_version
137                ), f"Materialize version mismatch, expected {version_cargo}, but got {mz_version}"
138
139            e.current_mz_version = mz_version
140
141
142class ConfigureMz(MzcomposeAction):
143    def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
144        self.handle: Any | None = None
145        self.mz_service = mz_service
146        self.scenario = scenario
147
148    def execute(self, e: Executor) -> None:
149        input = dedent(
150            """
151            # Run any query to have the materialize user implicitly created if
152            # it didn't exist yet. Required for the GRANT later.
153            > SELECT 1;
154            1
155            """
156        )
157
158        system_settings = {
159            "ALTER SYSTEM SET max_tables = 1000;",
160            "ALTER SYSTEM SET max_sinks = 1000;",
161            "ALTER SYSTEM SET max_sources = 1000;",
162            "ALTER SYSTEM SET max_materialized_views = 1000;",
163            "ALTER SYSTEM SET max_objects_per_schema = 1000;",
164            "ALTER SYSTEM SET max_secrets = 1000;",
165            "ALTER SYSTEM SET max_clusters = 1000;",
166        }
167
168        # Since we already test with RBAC enabled, we have to give materialize
169        # user the relevant attributes so the existing tests keep working.
170        system_settings.add("GRANT ALL PRIVILEGES ON SYSTEM TO materialize;")
171
172        # do not enable this by default for all checks
173        system_settings.add("ALTER SYSTEM SET enable_rbac_checks TO false;")
174
175        # Since we already test with RBAC enabled, we have to give materialize
176        # user the relevant privileges so the existing tests keep working.
177        system_settings.add("GRANT CREATE ON DATABASE materialize TO materialize;")
178        system_settings.add("GRANT CREATE ON SCHEMA materialize.public TO materialize;")
179        system_settings.add("GRANT CREATE ON CLUSTER quickstart TO materialize;")
180
181        system_settings = system_settings - e.system_settings
182
183        if system_settings:
184            input += (
185                "$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}\n"
186                + "\n".join(system_settings)
187            )
188
189        kafka_broker = "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT"
190        input += dedent(
191            f"""
192            > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA {kafka_broker}
193
194            > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
195            """
196        )
197
198        self.handle = e.testdrive(input=input, mz_service=self.mz_service)
199        e.system_settings.update(system_settings)
200
201    def join(self, e: Executor) -> None:
202        e.join(self.handle)
203
204
205class SetupSqlServerTesting(MzcomposeAction):
206    def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
207        self.handle: Any | None = None
208        self.mz_service = mz_service
209        self.scenario = scenario
210
211    def execute(self, e: Executor) -> None:
212        with open(MZ_ROOT / "test" / "sql-server-cdc" / "setup" / "setup.td") as f:
213            self.handle = e.testdrive(input=f.read(), mz_service=self.mz_service)
214
215    def join(self, e: Executor) -> None:
216        e.join(self.handle)
217
218
219class KillMz(MzcomposeAction):
220    def __init__(
221        self, mz_service: str = "materialized", capture_logs: bool = False
222    ) -> None:
223        self.mz_service = mz_service
224        self.capture_logs = capture_logs
225
226    def execute(self, e: Executor) -> None:
227        c = e.mzcompose_composition()
228
229        # Don't fail since we are careful to explicitly kill and collect logs
230        # of the services thus started
231        with c.override(Materialized(name=self.mz_service), fail_on_new_service=False):
232            c.kill(self.mz_service, wait=True)
233
234            if self.capture_logs:
235                c.capture_logs(self.mz_service)
236
237
238class Stop(MzcomposeAction):
239    def __init__(self, service: str = "materialized") -> None:
240        self.service = service
241
242    def execute(self, e: Executor) -> None:
243        c = e.mzcompose_composition()
244        c.stop(self.service, wait=True)
245
246
247class Down(MzcomposeAction):
248    def execute(self, e: Executor) -> None:
249        c = e.mzcompose_composition()
250        c.down()
251
252
253class UseClusterdCompute(MzcomposeAction):
254    def __init__(self, scenario: "Scenario") -> None:
255        self.base_version = scenario.base_version()
256
257    def execute(self, e: Executor) -> None:
258        c = e.mzcompose_composition()
259
260        c.sql(
261            "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = on;",
262            port=6877,
263            user="mz_system",
264        )
265        c.sql(
266            """
267            ALTER CLUSTER quickstart SET (MANAGED = false);
268            DROP CLUSTER REPLICA quickstart.r1;
269            CREATE CLUSTER REPLICA quickstart.r1
270                STORAGECTL ADDRESSES ['clusterd_compute_1:2100'],
271                STORAGE ADDRESSES ['clusterd_compute_1:2103'],
272                COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'],
273                COMPUTE ADDRESSES ['clusterd_compute_1:2102'],
274                WORKERS 1;
275            """,
276            port=6877,
277            user="mz_system",
278        )
279
280
281class KillClusterdCompute(MzcomposeAction):
282    def __init__(self, capture_logs: bool = False) -> None:
283        self.capture_logs = capture_logs
284
285    def execute(self, e: Executor) -> None:
286        c = e.mzcompose_composition()
287        with c.override(Clusterd(name="clusterd_compute_1")):
288            c.kill("clusterd_compute_1")
289
290            if self.capture_logs:
291                c.capture_logs("clusterd_compute_1")
292
293
294class StartClusterdCompute(MzcomposeAction):
295    def __init__(self, tag: MzVersion | None = None) -> None:
296        self.tag = tag
297
298    def execute(self, e: Executor) -> None:
299        c = e.mzcompose_composition()
300
301        clusterd = Clusterd(name="clusterd_compute_1")
302        if self.tag:
303            clusterd = Clusterd(
304                name="clusterd_compute_1",
305                image=f"materialize/clusterd:{self.tag}",
306            )
307        print(f"Starting Compute using image {clusterd.config.get('image')}")
308
309        with c.override(clusterd):
310            c.up("clusterd_compute_1")
311
312
313class RestartRedpandaDebezium(MzcomposeAction):
314    """Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together."""
315
316    def execute(self, e: Executor) -> None:
317        c = e.mzcompose_composition()
318
319        for service in ["redpanda", "debezium"]:
320            c.kill(service)
321            c.up(service)
322
323
324class RestartCockroach(MzcomposeAction):
325    def execute(self, e: Executor) -> None:
326        c = e.mzcompose_composition()
327
328        c.kill(c.metadata_store())
329        c.up(c.metadata_store())
330
331
332class RestartSourcePostgres(MzcomposeAction):
333    def execute(self, e: Executor) -> None:
334        c = e.mzcompose_composition()
335
336        c.kill("postgres")
337        c.up("postgres")
338
339
340class KillClusterdStorage(MzcomposeAction):
341    def execute(self, e: Executor) -> None:
342        c = e.mzcompose_composition()
343
344        # Depending on the workload, clusterd may not be running, hence the || true
345        c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")
346
347
348class DropCreateDefaultReplica(MzcomposeAction):
349    def __init__(self, scenario: "Scenario") -> None:
350        self.base_version = scenario.base_version()
351
352    def execute(self, e: Executor) -> None:
353        c = e.mzcompose_composition()
354
355        c.sql(
356            """
357            ALTER CLUSTER quickstart SET (MANAGED = false);
358            DROP CLUSTER REPLICA quickstart.r1;
359            CREATE CLUSTER REPLICA quickstart.r1 SIZE 'scale=1,workers=1';
360            """,
361            port=6877,
362            user="mz_system",
363        )
364
365
366class WaitReadyMz(MzcomposeAction):
367    """Wait until environmentd is ready, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#get-apileaderstatus"""
368
369    def __init__(self, mz_service: str = "materialized") -> None:
370        self.mz_service = mz_service
371
372    def execute(self, e: Executor) -> None:
373        e.mzcompose_composition().await_mz_deployment_status(
374            DeploymentStatus.READY_TO_PROMOTE, self.mz_service
375        )
376
377
378class PromoteMz(MzcomposeAction):
379    """Promote environmentd to leader, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#post-apileaderpromote"""
380
381    def __init__(self, mz_service: str = "materialized") -> None:
382        self.mz_service = mz_service
383
384    def execute(self, e: Executor) -> None:
385        c = e.mzcompose_composition()
386
387        result = json.loads(
388            c.exec(
389                self.mz_service,
390                "curl",
391                "-s",
392                "-X",
393                "POST",
394                "http://127.0.0.1:6878/api/leader/promote",
395                capture=True,
396            ).stdout
397        )
398        assert result["result"] == "Success", f"Unexpected result {result}"
399
400        # Wait until new Materialize is ready to handle queries
401        c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, self.mz_service)
402
403        mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
404        e.current_mz_version = mz_version
405
406
407class SystemVarChange(MzcomposeAction):
408    """Changes a system var."""
409
410    def __init__(self, name: str, value: str):
411        self.name = name
412        self.value = value
413
414    def execute(self, e: Executor) -> None:
415        c = e.mzcompose_composition()
416
417        c.sql(
418            f"ALTER SYSTEM SET {self.name} = {self.value};",
419            port=6877,
420            user="mz_system",
421        )
class MzcomposeAction(materialize.checks.actions.Action):
30class MzcomposeAction(Action):
31    def join(self, e: Executor) -> None:
32        # Most of these actions are already blocking
33        pass
def join(self, e: materialize.checks.executors.Executor) -> None:
31    def join(self, e: Executor) -> None:
32        # Most of these actions are already blocking
33        pass
class StartMz(MzcomposeAction):
 36class StartMz(MzcomposeAction):
 37    def __init__(
 38        self,
 39        scenario: "Scenario",
 40        tag: MzVersion | None = None,
 41        environment_extra: list[str] = [],
 42        system_parameter_defaults: dict[str, str] | None = None,
 43        additional_system_parameter_defaults: dict[str, str] = {},
 44        system_parameter_version: MzVersion | None = None,
 45        mz_service: str | None = None,
 46        platform: str | None = None,
 47        healthcheck: list[str] | None = None,
 48        deploy_generation: int | None = None,
 49        restart: str | None = None,
 50        force_migrations: str | None = None,
 51        publish: bool | None = None,
 52    ) -> None:
 53        if healthcheck is None:
 54            healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]
 55        self.tag = tag
 56        self.environment_extra = environment_extra
 57        self.system_parameter_defaults = system_parameter_defaults
 58        self.additional_system_parameter_defaults = additional_system_parameter_defaults
 59        self.system_parameter_version = system_parameter_version or tag
 60        self.healthcheck = healthcheck
 61        self.mz_service = mz_service
 62        self.platform = platform
 63        self.deploy_generation = deploy_generation
 64        self.restart = restart
 65        self.force_migrations = force_migrations
 66        self.publish = publish
 67        self.scenario = scenario
 68
 69    def execute(self, e: Executor) -> None:
 70        c = e.mzcompose_composition()
 71
 72        image = (
 73            f"{image_registry()}/materialized:{self.tag}"
 74            if self.tag is not None
 75            else None
 76        )
 77        print(f"Starting Mz using image {image}, mz_service {self.mz_service}")
 78
 79        listeners_config_path = (
 80            f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive.json"
 81        )
 82
 83        if not self.tag or self.tag >= MzVersion.parse_mz("v0.158.0-dev"):
 84            listeners_config_path = (
 85                f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive_sasl.json"
 86            )
 87
 88        mz = Materialized(
 89            name=self.mz_service,
 90            image=image,
 91            external_metadata_store=True,
 92            external_blob_store=True,
 93            blob_store_is_azure=self.scenario.features.azurite_enabled(),
 94            environment_extra=self.environment_extra,
 95            system_parameter_defaults=self.system_parameter_defaults,
 96            additional_system_parameter_defaults=self.additional_system_parameter_defaults,
 97            system_parameter_version=self.system_parameter_version,
 98            sanity_restart=False,
 99            platform=self.platform,
100            healthcheck=self.healthcheck,
101            deploy_generation=self.deploy_generation,
102            restart=self.restart,
103            force_migrations=self.force_migrations,
104            publish=self.publish,
105            default_replication_factor=2,
106            support_external_clusterd=True,
107            listeners_config_path=listeners_config_path,
108        )
109
110        # Don't fail since we are careful to explicitly kill and collect logs
111        # of the services thus started
112        with c.override(mz, fail_on_new_service=False):
113            c.up("materialized" if self.mz_service is None else self.mz_service)
114
115            # If we start up Materialize with a deploy-generation , then it
116            # stays in a stuck state when the preflight-check is completed. So
117            # we can't connect to it yet to run any commands.
118            if self.deploy_generation:
119                return
120
121            # This should live in ssh.py and alter_connection.py, but accessing the
122            # ssh bastion host from inside a check is not possible currently.
123            for i in range(4):
124                ssh_tunnel_name = f"ssh_tunnel_{i}"
125                setup_default_ssh_test_connection(
126                    c, ssh_tunnel_name, mz_service=self.mz_service
127                )
128
129            mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
130            if self.tag:
131                assert (
132                    self.tag == mz_version
133                ), f"Materialize version mismatch, expected {self.tag}, but got {mz_version}"
134            else:
135                version_cargo = MzVersion.parse_cargo()
136                assert (
137                    version_cargo == mz_version
138                ), f"Materialize version mismatch, expected {version_cargo}, but got {mz_version}"
139
140            e.current_mz_version = mz_version
StartMz( scenario: materialize.checks.scenarios.Scenario, tag: materialize.mz_version.MzVersion | None = None, environment_extra: list[str] = [], system_parameter_defaults: dict[str, str] | None = None, additional_system_parameter_defaults: dict[str, str] = {}, system_parameter_version: materialize.mz_version.MzVersion | None = None, mz_service: str | None = None, platform: str | None = None, healthcheck: list[str] | None = None, deploy_generation: int | None = None, restart: str | None = None, force_migrations: str | None = None, publish: bool | None = None)
37    def __init__(
38        self,
39        scenario: "Scenario",
40        tag: MzVersion | None = None,
41        environment_extra: list[str] = [],
42        system_parameter_defaults: dict[str, str] | None = None,
43        additional_system_parameter_defaults: dict[str, str] = {},
44        system_parameter_version: MzVersion | None = None,
45        mz_service: str | None = None,
46        platform: str | None = None,
47        healthcheck: list[str] | None = None,
48        deploy_generation: int | None = None,
49        restart: str | None = None,
50        force_migrations: str | None = None,
51        publish: bool | None = None,
52    ) -> None:
53        if healthcheck is None:
54            healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]
55        self.tag = tag
56        self.environment_extra = environment_extra
57        self.system_parameter_defaults = system_parameter_defaults
58        self.additional_system_parameter_defaults = additional_system_parameter_defaults
59        self.system_parameter_version = system_parameter_version or tag
60        self.healthcheck = healthcheck
61        self.mz_service = mz_service
62        self.platform = platform
63        self.deploy_generation = deploy_generation
64        self.restart = restart
65        self.force_migrations = force_migrations
66        self.publish = publish
67        self.scenario = scenario
tag
environment_extra
system_parameter_defaults
additional_system_parameter_defaults
system_parameter_version
healthcheck
mz_service
platform
deploy_generation
restart
force_migrations
publish
scenario
def execute(self, e: materialize.checks.executors.Executor) -> None:
 69    def execute(self, e: Executor) -> None:
 70        c = e.mzcompose_composition()
 71
 72        image = (
 73            f"{image_registry()}/materialized:{self.tag}"
 74            if self.tag is not None
 75            else None
 76        )
 77        print(f"Starting Mz using image {image}, mz_service {self.mz_service}")
 78
 79        listeners_config_path = (
 80            f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive.json"
 81        )
 82
 83        if not self.tag or self.tag >= MzVersion.parse_mz("v0.158.0-dev"):
 84            listeners_config_path = (
 85                f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive_sasl.json"
 86            )
 87
 88        mz = Materialized(
 89            name=self.mz_service,
 90            image=image,
 91            external_metadata_store=True,
 92            external_blob_store=True,
 93            blob_store_is_azure=self.scenario.features.azurite_enabled(),
 94            environment_extra=self.environment_extra,
 95            system_parameter_defaults=self.system_parameter_defaults,
 96            additional_system_parameter_defaults=self.additional_system_parameter_defaults,
 97            system_parameter_version=self.system_parameter_version,
 98            sanity_restart=False,
 99            platform=self.platform,
100            healthcheck=self.healthcheck,
101            deploy_generation=self.deploy_generation,
102            restart=self.restart,
103            force_migrations=self.force_migrations,
104            publish=self.publish,
105            default_replication_factor=2,
106            support_external_clusterd=True,
107            listeners_config_path=listeners_config_path,
108        )
109
110        # Don't fail since we are careful to explicitly kill and collect logs
111        # of the services thus started
112        with c.override(mz, fail_on_new_service=False):
113            c.up("materialized" if self.mz_service is None else self.mz_service)
114
115            # If we start up Materialize with a deploy-generation , then it
116            # stays in a stuck state when the preflight-check is completed. So
117            # we can't connect to it yet to run any commands.
118            if self.deploy_generation:
119                return
120
121            # This should live in ssh.py and alter_connection.py, but accessing the
122            # ssh bastion host from inside a check is not possible currently.
123            for i in range(4):
124                ssh_tunnel_name = f"ssh_tunnel_{i}"
125                setup_default_ssh_test_connection(
126                    c, ssh_tunnel_name, mz_service=self.mz_service
127                )
128
129            mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
130            if self.tag:
131                assert (
132                    self.tag == mz_version
133                ), f"Materialize version mismatch, expected {self.tag}, but got {mz_version}"
134            else:
135                version_cargo = MzVersion.parse_cargo()
136                assert (
137                    version_cargo == mz_version
138                ), f"Materialize version mismatch, expected {version_cargo}, but got {mz_version}"
139
140            e.current_mz_version = mz_version
Inherited Members
MzcomposeAction
join
class ConfigureMz(MzcomposeAction):
143class ConfigureMz(MzcomposeAction):
144    def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
145        self.handle: Any | None = None
146        self.mz_service = mz_service
147        self.scenario = scenario
148
149    def execute(self, e: Executor) -> None:
150        input = dedent(
151            """
152            # Run any query to have the materialize user implicitly created if
153            # it didn't exist yet. Required for the GRANT later.
154            > SELECT 1;
155            1
156            """
157        )
158
159        system_settings = {
160            "ALTER SYSTEM SET max_tables = 1000;",
161            "ALTER SYSTEM SET max_sinks = 1000;",
162            "ALTER SYSTEM SET max_sources = 1000;",
163            "ALTER SYSTEM SET max_materialized_views = 1000;",
164            "ALTER SYSTEM SET max_objects_per_schema = 1000;",
165            "ALTER SYSTEM SET max_secrets = 1000;",
166            "ALTER SYSTEM SET max_clusters = 1000;",
167        }
168
169        # Since we already test with RBAC enabled, we have to give materialize
170        # user the relevant attributes so the existing tests keep working.
171        system_settings.add("GRANT ALL PRIVILEGES ON SYSTEM TO materialize;")
172
173        # do not enable this by default for all checks
174        system_settings.add("ALTER SYSTEM SET enable_rbac_checks TO false;")
175
176        # Since we already test with RBAC enabled, we have to give materialize
177        # user the relevant privileges so the existing tests keep working.
178        system_settings.add("GRANT CREATE ON DATABASE materialize TO materialize;")
179        system_settings.add("GRANT CREATE ON SCHEMA materialize.public TO materialize;")
180        system_settings.add("GRANT CREATE ON CLUSTER quickstart TO materialize;")
181
182        system_settings = system_settings - e.system_settings
183
184        if system_settings:
185            input += (
186                "$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}\n"
187                + "\n".join(system_settings)
188            )
189
190        kafka_broker = "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT"
191        input += dedent(
192            f"""
193            > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA {kafka_broker}
194
195            > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
196            """
197        )
198
199        self.handle = e.testdrive(input=input, mz_service=self.mz_service)
200        e.system_settings.update(system_settings)
201
202    def join(self, e: Executor) -> None:
203        e.join(self.handle)
ConfigureMz( scenario: materialize.checks.scenarios.Scenario, mz_service: str | None = None)
144    def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
145        self.handle: Any | None = None
146        self.mz_service = mz_service
147        self.scenario = scenario
handle: typing.Any | None
mz_service
scenario
def execute(self, e: materialize.checks.executors.Executor) -> None:
149    def execute(self, e: Executor) -> None:
150        input = dedent(
151            """
152            # Run any query to have the materialize user implicitly created if
153            # it didn't exist yet. Required for the GRANT later.
154            > SELECT 1;
155            1
156            """
157        )
158
159        system_settings = {
160            "ALTER SYSTEM SET max_tables = 1000;",
161            "ALTER SYSTEM SET max_sinks = 1000;",
162            "ALTER SYSTEM SET max_sources = 1000;",
163            "ALTER SYSTEM SET max_materialized_views = 1000;",
164            "ALTER SYSTEM SET max_objects_per_schema = 1000;",
165            "ALTER SYSTEM SET max_secrets = 1000;",
166            "ALTER SYSTEM SET max_clusters = 1000;",
167        }
168
169        # Since we already test with RBAC enabled, we have to give materialize
170        # user the relevant attributes so the existing tests keep working.
171        system_settings.add("GRANT ALL PRIVILEGES ON SYSTEM TO materialize;")
172
173        # do not enable this by default for all checks
174        system_settings.add("ALTER SYSTEM SET enable_rbac_checks TO false;")
175
176        # Since we already test with RBAC enabled, we have to give materialize
177        # user the relevant privileges so the existing tests keep working.
178        system_settings.add("GRANT CREATE ON DATABASE materialize TO materialize;")
179        system_settings.add("GRANT CREATE ON SCHEMA materialize.public TO materialize;")
180        system_settings.add("GRANT CREATE ON CLUSTER quickstart TO materialize;")
181
182        system_settings = system_settings - e.system_settings
183
184        if system_settings:
185            input += (
186                "$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}\n"
187                + "\n".join(system_settings)
188            )
189
190        kafka_broker = "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT"
191        input += dedent(
192            f"""
193            > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA {kafka_broker}
194
195            > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
196            """
197        )
198
199        self.handle = e.testdrive(input=input, mz_service=self.mz_service)
200        e.system_settings.update(system_settings)
def join(self, e: materialize.checks.executors.Executor) -> None:
202    def join(self, e: Executor) -> None:
203        e.join(self.handle)
class SetupSqlServerTesting(MzcomposeAction):
206class SetupSqlServerTesting(MzcomposeAction):
207    def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
208        self.handle: Any | None = None
209        self.mz_service = mz_service
210        self.scenario = scenario
211
212    def execute(self, e: Executor) -> None:
213        with open(MZ_ROOT / "test" / "sql-server-cdc" / "setup" / "setup.td") as f:
214            self.handle = e.testdrive(input=f.read(), mz_service=self.mz_service)
215
216    def join(self, e: Executor) -> None:
217        e.join(self.handle)
SetupSqlServerTesting( scenario: materialize.checks.scenarios.Scenario, mz_service: str | None = None)
207    def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
208        self.handle: Any | None = None
209        self.mz_service = mz_service
210        self.scenario = scenario
handle: typing.Any | None
mz_service
scenario
def execute(self, e: materialize.checks.executors.Executor) -> None:
212    def execute(self, e: Executor) -> None:
213        with open(MZ_ROOT / "test" / "sql-server-cdc" / "setup" / "setup.td") as f:
214            self.handle = e.testdrive(input=f.read(), mz_service=self.mz_service)
def join(self, e: materialize.checks.executors.Executor) -> None:
216    def join(self, e: Executor) -> None:
217        e.join(self.handle)
class KillMz(MzcomposeAction):
220class KillMz(MzcomposeAction):
221    def __init__(
222        self, mz_service: str = "materialized", capture_logs: bool = False
223    ) -> None:
224        self.mz_service = mz_service
225        self.capture_logs = capture_logs
226
227    def execute(self, e: Executor) -> None:
228        c = e.mzcompose_composition()
229
230        # Don't fail since we are careful to explicitly kill and collect logs
231        # of the services thus started
232        with c.override(Materialized(name=self.mz_service), fail_on_new_service=False):
233            c.kill(self.mz_service, wait=True)
234
235            if self.capture_logs:
236                c.capture_logs(self.mz_service)
KillMz(mz_service: str = 'materialized', capture_logs: bool = False)
221    def __init__(
222        self, mz_service: str = "materialized", capture_logs: bool = False
223    ) -> None:
224        self.mz_service = mz_service
225        self.capture_logs = capture_logs
mz_service
capture_logs
def execute(self, e: materialize.checks.executors.Executor) -> None:
227    def execute(self, e: Executor) -> None:
228        c = e.mzcompose_composition()
229
230        # Don't fail since we are careful to explicitly kill and collect logs
231        # of the services thus started
232        with c.override(Materialized(name=self.mz_service), fail_on_new_service=False):
233            c.kill(self.mz_service, wait=True)
234
235            if self.capture_logs:
236                c.capture_logs(self.mz_service)
Inherited Members
MzcomposeAction
join
class Stop(MzcomposeAction):
239class Stop(MzcomposeAction):
240    def __init__(self, service: str = "materialized") -> None:
241        self.service = service
242
243    def execute(self, e: Executor) -> None:
244        c = e.mzcompose_composition()
245        c.stop(self.service, wait=True)
Stop(service: str = 'materialized')
240    def __init__(self, service: str = "materialized") -> None:
241        self.service = service
service
def execute(self, e: materialize.checks.executors.Executor) -> None:
243    def execute(self, e: Executor) -> None:
244        c = e.mzcompose_composition()
245        c.stop(self.service, wait=True)
Inherited Members
MzcomposeAction
join
class Down(MzcomposeAction):
248class Down(MzcomposeAction):
249    def execute(self, e: Executor) -> None:
250        c = e.mzcompose_composition()
251        c.down()
def execute(self, e: materialize.checks.executors.Executor) -> None:
249    def execute(self, e: Executor) -> None:
250        c = e.mzcompose_composition()
251        c.down()
Inherited Members
MzcomposeAction
join
class UseClusterdCompute(MzcomposeAction):
254class UseClusterdCompute(MzcomposeAction):
255    def __init__(self, scenario: "Scenario") -> None:
256        self.base_version = scenario.base_version()
257
258    def execute(self, e: Executor) -> None:
259        c = e.mzcompose_composition()
260
261        c.sql(
262            "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = on;",
263            port=6877,
264            user="mz_system",
265        )
266        c.sql(
267            """
268            ALTER CLUSTER quickstart SET (MANAGED = false);
269            DROP CLUSTER REPLICA quickstart.r1;
270            CREATE CLUSTER REPLICA quickstart.r1
271                STORAGECTL ADDRESSES ['clusterd_compute_1:2100'],
272                STORAGE ADDRESSES ['clusterd_compute_1:2103'],
273                COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'],
274                COMPUTE ADDRESSES ['clusterd_compute_1:2102'],
275                WORKERS 1;
276            """,
277            port=6877,
278            user="mz_system",
279        )
UseClusterdCompute(scenario: materialize.checks.scenarios.Scenario)
255    def __init__(self, scenario: "Scenario") -> None:
256        self.base_version = scenario.base_version()
base_version
def execute(self, e: materialize.checks.executors.Executor) -> None:
258    def execute(self, e: Executor) -> None:
259        c = e.mzcompose_composition()
260
261        c.sql(
262            "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = on;",
263            port=6877,
264            user="mz_system",
265        )
266        c.sql(
267            """
268            ALTER CLUSTER quickstart SET (MANAGED = false);
269            DROP CLUSTER REPLICA quickstart.r1;
270            CREATE CLUSTER REPLICA quickstart.r1
271                STORAGECTL ADDRESSES ['clusterd_compute_1:2100'],
272                STORAGE ADDRESSES ['clusterd_compute_1:2103'],
273                COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'],
274                COMPUTE ADDRESSES ['clusterd_compute_1:2102'],
275                WORKERS 1;
276            """,
277            port=6877,
278            user="mz_system",
279        )
Inherited Members
MzcomposeAction
join
class KillClusterdCompute(MzcomposeAction):
282class KillClusterdCompute(MzcomposeAction):
283    def __init__(self, capture_logs: bool = False) -> None:
284        self.capture_logs = capture_logs
285
286    def execute(self, e: Executor) -> None:
287        c = e.mzcompose_composition()
288        with c.override(Clusterd(name="clusterd_compute_1")):
289            c.kill("clusterd_compute_1")
290
291            if self.capture_logs:
292                c.capture_logs("clusterd_compute_1")
KillClusterdCompute(capture_logs: bool = False)
283    def __init__(self, capture_logs: bool = False) -> None:
284        self.capture_logs = capture_logs
capture_logs
def execute(self, e: materialize.checks.executors.Executor) -> None:
286    def execute(self, e: Executor) -> None:
287        c = e.mzcompose_composition()
288        with c.override(Clusterd(name="clusterd_compute_1")):
289            c.kill("clusterd_compute_1")
290
291            if self.capture_logs:
292                c.capture_logs("clusterd_compute_1")
Inherited Members
MzcomposeAction
join
class StartClusterdCompute(MzcomposeAction):
295class StartClusterdCompute(MzcomposeAction):
296    def __init__(self, tag: MzVersion | None = None) -> None:
297        self.tag = tag
298
299    def execute(self, e: Executor) -> None:
300        c = e.mzcompose_composition()
301
302        clusterd = Clusterd(name="clusterd_compute_1")
303        if self.tag:
304            clusterd = Clusterd(
305                name="clusterd_compute_1",
306                image=f"materialize/clusterd:{self.tag}",
307            )
308        print(f"Starting Compute using image {clusterd.config.get('image')}")
309
310        with c.override(clusterd):
311            c.up("clusterd_compute_1")
StartClusterdCompute(tag: materialize.mz_version.MzVersion | None = None)
296    def __init__(self, tag: MzVersion | None = None) -> None:
297        self.tag = tag
tag
def execute(self, e: materialize.checks.executors.Executor) -> None:
299    def execute(self, e: Executor) -> None:
300        c = e.mzcompose_composition()
301
302        clusterd = Clusterd(name="clusterd_compute_1")
303        if self.tag:
304            clusterd = Clusterd(
305                name="clusterd_compute_1",
306                image=f"materialize/clusterd:{self.tag}",
307            )
308        print(f"Starting Compute using image {clusterd.config.get('image')}")
309
310        with c.override(clusterd):
311            c.up("clusterd_compute_1")
Inherited Members
MzcomposeAction
join
class RestartRedpandaDebezium(MzcomposeAction):
314class RestartRedpandaDebezium(MzcomposeAction):
315    """Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together."""
316
317    def execute(self, e: Executor) -> None:
318        c = e.mzcompose_composition()
319
320        for service in ["redpanda", "debezium"]:
321            c.kill(service)
322            c.up(service)

Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together.

def execute(self, e: materialize.checks.executors.Executor) -> None:
317    def execute(self, e: Executor) -> None:
318        c = e.mzcompose_composition()
319
320        for service in ["redpanda", "debezium"]:
321            c.kill(service)
322            c.up(service)
Inherited Members
MzcomposeAction
join
class RestartCockroach(MzcomposeAction):
325class RestartCockroach(MzcomposeAction):
326    def execute(self, e: Executor) -> None:
327        c = e.mzcompose_composition()
328
329        c.kill(c.metadata_store())
330        c.up(c.metadata_store())
def execute(self, e: materialize.checks.executors.Executor) -> None:
326    def execute(self, e: Executor) -> None:
327        c = e.mzcompose_composition()
328
329        c.kill(c.metadata_store())
330        c.up(c.metadata_store())
Inherited Members
MzcomposeAction
join
class RestartSourcePostgres(MzcomposeAction):
333class RestartSourcePostgres(MzcomposeAction):
334    def execute(self, e: Executor) -> None:
335        c = e.mzcompose_composition()
336
337        c.kill("postgres")
338        c.up("postgres")
def execute(self, e: materialize.checks.executors.Executor) -> None:
334    def execute(self, e: Executor) -> None:
335        c = e.mzcompose_composition()
336
337        c.kill("postgres")
338        c.up("postgres")
Inherited Members
MzcomposeAction
join
class KillClusterdStorage(MzcomposeAction):
341class KillClusterdStorage(MzcomposeAction):
342    def execute(self, e: Executor) -> None:
343        c = e.mzcompose_composition()
344
345        # Depending on the workload, clusterd may not be running, hence the || true
346        c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")
def execute(self, e: materialize.checks.executors.Executor) -> None:
342    def execute(self, e: Executor) -> None:
343        c = e.mzcompose_composition()
344
345        # Depending on the workload, clusterd may not be running, hence the || true
346        c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")
Inherited Members
MzcomposeAction
join
class DropCreateDefaultReplica(MzcomposeAction):
349class DropCreateDefaultReplica(MzcomposeAction):
350    def __init__(self, scenario: "Scenario") -> None:
351        self.base_version = scenario.base_version()
352
353    def execute(self, e: Executor) -> None:
354        c = e.mzcompose_composition()
355
356        c.sql(
357            """
358            ALTER CLUSTER quickstart SET (MANAGED = false);
359            DROP CLUSTER REPLICA quickstart.r1;
360            CREATE CLUSTER REPLICA quickstart.r1 SIZE 'scale=1,workers=1';
361            """,
362            port=6877,
363            user="mz_system",
364        )
DropCreateDefaultReplica(scenario: materialize.checks.scenarios.Scenario)
350    def __init__(self, scenario: "Scenario") -> None:
351        self.base_version = scenario.base_version()
base_version
def execute(self, e: materialize.checks.executors.Executor) -> None:
353    def execute(self, e: Executor) -> None:
354        c = e.mzcompose_composition()
355
356        c.sql(
357            """
358            ALTER CLUSTER quickstart SET (MANAGED = false);
359            DROP CLUSTER REPLICA quickstart.r1;
360            CREATE CLUSTER REPLICA quickstart.r1 SIZE 'scale=1,workers=1';
361            """,
362            port=6877,
363            user="mz_system",
364        )
Inherited Members
MzcomposeAction
join
class WaitReadyMz(MzcomposeAction):
367class WaitReadyMz(MzcomposeAction):
368    """Wait until environmentd is ready, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#get-apileaderstatus"""
369
370    def __init__(self, mz_service: str = "materialized") -> None:
371        self.mz_service = mz_service
372
373    def execute(self, e: Executor) -> None:
374        e.mzcompose_composition().await_mz_deployment_status(
375            DeploymentStatus.READY_TO_PROMOTE, self.mz_service
376        )
WaitReadyMz(mz_service: str = 'materialized')
370    def __init__(self, mz_service: str = "materialized") -> None:
371        self.mz_service = mz_service
mz_service
def execute(self, e: materialize.checks.executors.Executor) -> None:
373    def execute(self, e: Executor) -> None:
374        e.mzcompose_composition().await_mz_deployment_status(
375            DeploymentStatus.READY_TO_PROMOTE, self.mz_service
376        )
Inherited Members
MzcomposeAction
join
class PromoteMz(MzcomposeAction):
379class PromoteMz(MzcomposeAction):
380    """Promote environmentd to leader, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#post-apileaderpromote"""
381
382    def __init__(self, mz_service: str = "materialized") -> None:
383        self.mz_service = mz_service
384
385    def execute(self, e: Executor) -> None:
386        c = e.mzcompose_composition()
387
388        result = json.loads(
389            c.exec(
390                self.mz_service,
391                "curl",
392                "-s",
393                "-X",
394                "POST",
395                "http://127.0.0.1:6878/api/leader/promote",
396                capture=True,
397            ).stdout
398        )
399        assert result["result"] == "Success", f"Unexpected result {result}"
400
401        # Wait until new Materialize is ready to handle queries
402        c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, self.mz_service)
403
404        mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
405        e.current_mz_version = mz_version
PromoteMz(mz_service: str = 'materialized')
382    def __init__(self, mz_service: str = "materialized") -> None:
383        self.mz_service = mz_service
mz_service
def execute(self, e: materialize.checks.executors.Executor) -> None:
385    def execute(self, e: Executor) -> None:
386        c = e.mzcompose_composition()
387
388        result = json.loads(
389            c.exec(
390                self.mz_service,
391                "curl",
392                "-s",
393                "-X",
394                "POST",
395                "http://127.0.0.1:6878/api/leader/promote",
396                capture=True,
397            ).stdout
398        )
399        assert result["result"] == "Success", f"Unexpected result {result}"
400
401        # Wait until new Materialize is ready to handle queries
402        c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, self.mz_service)
403
404        mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
405        e.current_mz_version = mz_version
Inherited Members
MzcomposeAction
join
class SystemVarChange(MzcomposeAction):
408class SystemVarChange(MzcomposeAction):
409    """Changes a system var."""
410
411    def __init__(self, name: str, value: str):
412        self.name = name
413        self.value = value
414
415    def execute(self, e: Executor) -> None:
416        c = e.mzcompose_composition()
417
418        c.sql(
419            f"ALTER SYSTEM SET {self.name} = {self.value};",
420            port=6877,
421            user="mz_system",
422        )

Changes a system var.

SystemVarChange(name: str, value: str)
411    def __init__(self, name: str, value: str):
412        self.name = name
413        self.value = value
name
value
def execute(self, e: materialize.checks.executors.Executor) -> None:
415    def execute(self, e: Executor) -> None:
416        c = e.mzcompose_composition()
417
418        c.sql(
419            f"ALTER SYSTEM SET {self.name} = {self.value};",
420            port=6877,
421            user="mz_system",
422        )
Inherited Members
MzcomposeAction
join