misc.python.materialize.checks.mzcompose_actions
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import json 11from textwrap import dedent 12from typing import TYPE_CHECKING, Any 13 14from materialize import MZ_ROOT 15from materialize.checks.actions import Action 16from materialize.checks.executors import Executor 17from materialize.docker import image_registry 18from materialize.mz_version import MzVersion 19from materialize.mzcompose.services.clusterd import Clusterd 20from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized 21from materialize.mzcompose.services.ssh_bastion_host import ( 22 setup_default_ssh_test_connection, 23) 24 25if TYPE_CHECKING: 26 from materialize.checks.scenarios import Scenario 27 28 29class MzcomposeAction(Action): 30 def join(self, e: Executor) -> None: 31 # Most of these actions are already blocking 32 pass 33 34 35class StartMz(MzcomposeAction): 36 def __init__( 37 self, 38 scenario: "Scenario", 39 tag: MzVersion | None = None, 40 environment_extra: list[str] = [], 41 system_parameter_defaults: dict[str, str] | None = None, 42 additional_system_parameter_defaults: dict[str, str] = {}, 43 system_parameter_version: MzVersion | None = None, 44 mz_service: str | None = None, 45 platform: str | None = None, 46 healthcheck: list[str] | None = None, 47 deploy_generation: int | None = None, 48 restart: str | None = None, 49 force_migrations: str | None = None, 50 publish: bool | None = None, 51 ) -> None: 52 if healthcheck is None: 53 healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"] 54 self.tag = tag 55 self.environment_extra = environment_extra 56 self.system_parameter_defaults = system_parameter_defaults 57 self.additional_system_parameter_defaults = additional_system_parameter_defaults 58 self.system_parameter_version = system_parameter_version or tag 59 self.healthcheck = healthcheck 60 self.mz_service = mz_service 61 self.platform = platform 62 self.deploy_generation = deploy_generation 63 self.restart = restart 64 self.force_migrations = force_migrations 65 self.publish = publish 66 self.scenario = scenario 67 68 def execute(self, e: Executor) -> None: 69 c = e.mzcompose_composition() 70 71 image = ( 72 f"{image_registry()}/materialized:{self.tag}" 73 if self.tag is not None 74 else None 75 ) 76 print(f"Starting Mz using image {image}, mz_service {self.mz_service}") 77 78 listeners_config_path = ( 79 f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive.json" 80 ) 81 82 if not self.tag or self.tag >= MzVersion.parse_mz("v0.158.0-dev"): 83 listeners_config_path = ( 84 f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive_sasl.json" 85 ) 86 87 mz = Materialized( 88 name=self.mz_service, 89 image=image, 90 external_metadata_store=True, 91 external_blob_store=True, 92 blob_store_is_azure=self.scenario.features.azurite_enabled(), 93 environment_extra=self.environment_extra, 94 system_parameter_defaults=self.system_parameter_defaults, 95 additional_system_parameter_defaults=self.additional_system_parameter_defaults, 96 system_parameter_version=self.system_parameter_version, 97 sanity_restart=False, 98 platform=self.platform, 99 healthcheck=self.healthcheck, 100 deploy_generation=self.deploy_generation, 101 restart=self.restart, 102 force_migrations=self.force_migrations, 103 publish=self.publish, 104 default_replication_factor=2, 105 support_external_clusterd=True, 106 listeners_config_path=listeners_config_path, 107 ) 108 109 # Don't fail since we are careful to explicitly kill and collect logs 110 # of the services thus started 111 with c.override(mz, fail_on_new_service=False): 112 c.up("materialized" if self.mz_service is None else self.mz_service) 113 114 # If we start up Materialize with a deploy-generation , then it 115 # stays in a stuck state when the preflight-check is completed. So 116 # we can't connect to it yet to run any commands. 117 if self.deploy_generation: 118 return 119 120 # This should live in ssh.py and alter_connection.py, but accessing the 121 # ssh bastion host from inside a check is not possible currently. 122 for i in range(4): 123 ssh_tunnel_name = f"ssh_tunnel_{i}" 124 setup_default_ssh_test_connection( 125 c, ssh_tunnel_name, mz_service=self.mz_service 126 ) 127 128 mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service)) 129 if self.tag: 130 assert ( 131 self.tag == mz_version 132 ), f"Materialize version mismatch, expected {self.tag}, but got {mz_version}" 133 else: 134 version_cargo = MzVersion.parse_cargo() 135 assert ( 136 version_cargo == mz_version 137 ), f"Materialize version mismatch, expected {version_cargo}, but got {mz_version}" 138 139 e.current_mz_version = mz_version 140 141 142class ConfigureMz(MzcomposeAction): 143 def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None: 144 self.handle: Any | None = None 145 self.mz_service = mz_service 146 self.scenario = scenario 147 148 def execute(self, e: Executor) -> None: 149 input = dedent( 150 """ 151 # Run any query to have the materialize user implicitly created if 152 # it didn't exist yet. Required for the GRANT later. 153 > SELECT 1; 154 1 155 """ 156 ) 157 158 system_settings = { 159 "ALTER SYSTEM SET max_tables = 1000;", 160 "ALTER SYSTEM SET max_sinks = 1000;", 161 "ALTER SYSTEM SET max_sources = 1000;", 162 "ALTER SYSTEM SET max_materialized_views = 1000;", 163 "ALTER SYSTEM SET max_objects_per_schema = 1000;", 164 "ALTER SYSTEM SET max_secrets = 1000;", 165 "ALTER SYSTEM SET max_clusters = 1000;", 166 } 167 168 # Since we already test with RBAC enabled, we have to give materialize 169 # user the relevant attributes so the existing tests keep working. 170 system_settings.add("GRANT ALL PRIVILEGES ON SYSTEM TO materialize;") 171 172 # do not enable this by default for all checks 173 system_settings.add("ALTER SYSTEM SET enable_rbac_checks TO false;") 174 175 # Since we already test with RBAC enabled, we have to give materialize 176 # user the relevant privileges so the existing tests keep working. 177 system_settings.add("GRANT CREATE ON DATABASE materialize TO materialize;") 178 system_settings.add("GRANT CREATE ON SCHEMA materialize.public TO materialize;") 179 system_settings.add("GRANT CREATE ON CLUSTER quickstart TO materialize;") 180 181 system_settings = system_settings - e.system_settings 182 183 if system_settings: 184 input += ( 185 "$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}\n" 186 + "\n".join(system_settings) 187 ) 188 189 kafka_broker = "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT" 190 input += dedent( 191 f""" 192 > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA {kafka_broker} 193 194 > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; 195 """ 196 ) 197 198 self.handle = e.testdrive(input=input, mz_service=self.mz_service) 199 e.system_settings.update(system_settings) 200 201 def join(self, e: Executor) -> None: 202 e.join(self.handle) 203 204 205class SetupSqlServerTesting(MzcomposeAction): 206 def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None: 207 self.handle: Any | None = None 208 self.mz_service = mz_service 209 self.scenario = scenario 210 211 def execute(self, e: Executor) -> None: 212 with open(MZ_ROOT / "test" / "sql-server-cdc" / "setup" / "setup.td") as f: 213 self.handle = e.testdrive(input=f.read(), mz_service=self.mz_service) 214 215 def join(self, e: Executor) -> None: 216 e.join(self.handle) 217 218 219class KillMz(MzcomposeAction): 220 def __init__( 221 self, mz_service: str = "materialized", capture_logs: bool = False 222 ) -> None: 223 self.mz_service = mz_service 224 self.capture_logs = capture_logs 225 226 def execute(self, e: Executor) -> None: 227 c = e.mzcompose_composition() 228 229 # Don't fail since we are careful to explicitly kill and collect logs 230 # of the services thus started 231 with c.override(Materialized(name=self.mz_service), fail_on_new_service=False): 232 c.kill(self.mz_service, wait=True) 233 234 if self.capture_logs: 235 c.capture_logs(self.mz_service) 236 237 238class Stop(MzcomposeAction): 239 def __init__(self, service: str = "materialized") -> None: 240 self.service = service 241 242 def execute(self, e: Executor) -> None: 243 c = e.mzcompose_composition() 244 c.stop(self.service, wait=True) 245 246 247class Down(MzcomposeAction): 248 def execute(self, e: Executor) -> None: 249 c = e.mzcompose_composition() 250 c.down() 251 252 253class UseClusterdCompute(MzcomposeAction): 254 def __init__(self, scenario: "Scenario") -> None: 255 self.base_version = scenario.base_version() 256 257 def execute(self, e: Executor) -> None: 258 c = e.mzcompose_composition() 259 260 c.sql( 261 "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = on;", 262 port=6877, 263 user="mz_system", 264 ) 265 c.sql( 266 """ 267 ALTER CLUSTER quickstart SET (MANAGED = false); 268 DROP CLUSTER REPLICA quickstart.r1; 269 CREATE CLUSTER REPLICA quickstart.r1 270 STORAGECTL ADDRESSES ['clusterd_compute_1:2100'], 271 STORAGE ADDRESSES ['clusterd_compute_1:2103'], 272 COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'], 273 COMPUTE ADDRESSES ['clusterd_compute_1:2102'], 274 WORKERS 1; 275 """, 276 port=6877, 277 user="mz_system", 278 ) 279 280 281class KillClusterdCompute(MzcomposeAction): 282 def __init__(self, capture_logs: bool = False) -> None: 283 self.capture_logs = capture_logs 284 285 def execute(self, e: Executor) -> None: 286 c = e.mzcompose_composition() 287 with c.override(Clusterd(name="clusterd_compute_1")): 288 c.kill("clusterd_compute_1") 289 290 if self.capture_logs: 291 c.capture_logs("clusterd_compute_1") 292 293 294class StartClusterdCompute(MzcomposeAction): 295 def __init__(self, tag: MzVersion | None = None) -> None: 296 self.tag = tag 297 298 def execute(self, e: Executor) -> None: 299 c = e.mzcompose_composition() 300 301 clusterd = Clusterd(name="clusterd_compute_1") 302 if self.tag: 303 clusterd = Clusterd( 304 name="clusterd_compute_1", 305 image=f"materialize/clusterd:{self.tag}", 306 ) 307 print(f"Starting Compute using image {clusterd.config.get('image')}") 308 309 with c.override(clusterd): 310 c.up("clusterd_compute_1") 311 312 313class RestartRedpandaDebezium(MzcomposeAction): 314 """Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together.""" 315 316 def execute(self, e: Executor) -> None: 317 c = e.mzcompose_composition() 318 319 for service in ["redpanda", "debezium"]: 320 c.kill(service) 321 c.up(service) 322 323 324class RestartCockroach(MzcomposeAction): 325 def execute(self, e: Executor) -> None: 326 c = e.mzcompose_composition() 327 328 c.kill(c.metadata_store()) 329 c.up(c.metadata_store()) 330 331 332class RestartSourcePostgres(MzcomposeAction): 333 def execute(self, e: Executor) -> None: 334 c = e.mzcompose_composition() 335 336 c.kill("postgres") 337 c.up("postgres") 338 339 340class KillClusterdStorage(MzcomposeAction): 341 def execute(self, e: Executor) -> None: 342 c = e.mzcompose_composition() 343 344 # Depending on the workload, clusterd may not be running, hence the || true 345 c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true") 346 347 348class DropCreateDefaultReplica(MzcomposeAction): 349 def __init__(self, scenario: "Scenario") -> None: 350 self.base_version = scenario.base_version() 351 352 def execute(self, e: Executor) -> None: 353 c = e.mzcompose_composition() 354 355 c.sql( 356 """ 357 ALTER CLUSTER quickstart SET (MANAGED = false); 358 DROP CLUSTER REPLICA quickstart.r1; 359 CREATE CLUSTER REPLICA quickstart.r1 SIZE 'scale=1,workers=1'; 360 """, 361 port=6877, 362 user="mz_system", 363 ) 364 365 366class WaitReadyMz(MzcomposeAction): 367 """Wait until environmentd is ready, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#get-apileaderstatus""" 368 369 def __init__(self, mz_service: str = "materialized") -> None: 370 self.mz_service = mz_service 371 372 def execute(self, e: Executor) -> None: 373 e.mzcompose_composition().await_mz_deployment_status( 374 DeploymentStatus.READY_TO_PROMOTE, self.mz_service 375 ) 376 377 378class PromoteMz(MzcomposeAction): 379 """Promote environmentd to leader, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#post-apileaderpromote""" 380 381 def __init__(self, mz_service: str = "materialized") -> None: 382 self.mz_service = mz_service 383 384 def execute(self, e: Executor) -> None: 385 c = e.mzcompose_composition() 386 387 result = json.loads( 388 c.exec( 389 self.mz_service, 390 "curl", 391 "-s", 392 "-X", 393 "POST", 394 "http://127.0.0.1:6878/api/leader/promote", 395 capture=True, 396 ).stdout 397 ) 398 assert result["result"] == "Success", f"Unexpected result {result}" 399 400 # Wait until new Materialize is ready to handle queries 401 c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, self.mz_service) 402 403 mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service)) 404 e.current_mz_version = mz_version 405 406 407class SystemVarChange(MzcomposeAction): 408 """Changes a system var.""" 409 410 def __init__(self, name: str, value: str): 411 self.name = name 412 self.value = value 413 414 def execute(self, e: Executor) -> None: 415 c = e.mzcompose_composition() 416 417 c.sql( 418 f"ALTER SYSTEM SET {self.name} = {self.value};", 419 port=6877, 420 user="mz_system", 421 )
class
MzcomposeAction(materialize.checks.actions.Action):
36class StartMz(MzcomposeAction): 37 def __init__( 38 self, 39 scenario: "Scenario", 40 tag: MzVersion | None = None, 41 environment_extra: list[str] = [], 42 system_parameter_defaults: dict[str, str] | None = None, 43 additional_system_parameter_defaults: dict[str, str] = {}, 44 system_parameter_version: MzVersion | None = None, 45 mz_service: str | None = None, 46 platform: str | None = None, 47 healthcheck: list[str] | None = None, 48 deploy_generation: int | None = None, 49 restart: str | None = None, 50 force_migrations: str | None = None, 51 publish: bool | None = None, 52 ) -> None: 53 if healthcheck is None: 54 healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"] 55 self.tag = tag 56 self.environment_extra = environment_extra 57 self.system_parameter_defaults = system_parameter_defaults 58 self.additional_system_parameter_defaults = additional_system_parameter_defaults 59 self.system_parameter_version = system_parameter_version or tag 60 self.healthcheck = healthcheck 61 self.mz_service = mz_service 62 self.platform = platform 63 self.deploy_generation = deploy_generation 64 self.restart = restart 65 self.force_migrations = force_migrations 66 self.publish = publish 67 self.scenario = scenario 68 69 def execute(self, e: Executor) -> None: 70 c = e.mzcompose_composition() 71 72 image = ( 73 f"{image_registry()}/materialized:{self.tag}" 74 if self.tag is not None 75 else None 76 ) 77 print(f"Starting Mz using image {image}, mz_service {self.mz_service}") 78 79 listeners_config_path = ( 80 f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive.json" 81 ) 82 83 if not self.tag or self.tag >= MzVersion.parse_mz("v0.158.0-dev"): 84 listeners_config_path = ( 85 f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive_sasl.json" 86 ) 87 88 mz = Materialized( 89 name=self.mz_service, 90 image=image, 91 external_metadata_store=True, 92 external_blob_store=True, 93 blob_store_is_azure=self.scenario.features.azurite_enabled(), 94 environment_extra=self.environment_extra, 95 system_parameter_defaults=self.system_parameter_defaults, 96 additional_system_parameter_defaults=self.additional_system_parameter_defaults, 97 system_parameter_version=self.system_parameter_version, 98 sanity_restart=False, 99 platform=self.platform, 100 healthcheck=self.healthcheck, 101 deploy_generation=self.deploy_generation, 102 restart=self.restart, 103 force_migrations=self.force_migrations, 104 publish=self.publish, 105 default_replication_factor=2, 106 support_external_clusterd=True, 107 listeners_config_path=listeners_config_path, 108 ) 109 110 # Don't fail since we are careful to explicitly kill and collect logs 111 # of the services thus started 112 with c.override(mz, fail_on_new_service=False): 113 c.up("materialized" if self.mz_service is None else self.mz_service) 114 115 # If we start up Materialize with a deploy-generation , then it 116 # stays in a stuck state when the preflight-check is completed. So 117 # we can't connect to it yet to run any commands. 118 if self.deploy_generation: 119 return 120 121 # This should live in ssh.py and alter_connection.py, but accessing the 122 # ssh bastion host from inside a check is not possible currently. 123 for i in range(4): 124 ssh_tunnel_name = f"ssh_tunnel_{i}" 125 setup_default_ssh_test_connection( 126 c, ssh_tunnel_name, mz_service=self.mz_service 127 ) 128 129 mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service)) 130 if self.tag: 131 assert ( 132 self.tag == mz_version 133 ), f"Materialize version mismatch, expected {self.tag}, but got {mz_version}" 134 else: 135 version_cargo = MzVersion.parse_cargo() 136 assert ( 137 version_cargo == mz_version 138 ), f"Materialize version mismatch, expected {version_cargo}, but got {mz_version}" 139 140 e.current_mz_version = mz_version
StartMz( scenario: materialize.checks.scenarios.Scenario, tag: materialize.mz_version.MzVersion | None = None, environment_extra: list[str] = [], system_parameter_defaults: dict[str, str] | None = None, additional_system_parameter_defaults: dict[str, str] = {}, system_parameter_version: materialize.mz_version.MzVersion | None = None, mz_service: str | None = None, platform: str | None = None, healthcheck: list[str] | None = None, deploy_generation: int | None = None, restart: str | None = None, force_migrations: str | None = None, publish: bool | None = None)
37 def __init__( 38 self, 39 scenario: "Scenario", 40 tag: MzVersion | None = None, 41 environment_extra: list[str] = [], 42 system_parameter_defaults: dict[str, str] | None = None, 43 additional_system_parameter_defaults: dict[str, str] = {}, 44 system_parameter_version: MzVersion | None = None, 45 mz_service: str | None = None, 46 platform: str | None = None, 47 healthcheck: list[str] | None = None, 48 deploy_generation: int | None = None, 49 restart: str | None = None, 50 force_migrations: str | None = None, 51 publish: bool | None = None, 52 ) -> None: 53 if healthcheck is None: 54 healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"] 55 self.tag = tag 56 self.environment_extra = environment_extra 57 self.system_parameter_defaults = system_parameter_defaults 58 self.additional_system_parameter_defaults = additional_system_parameter_defaults 59 self.system_parameter_version = system_parameter_version or tag 60 self.healthcheck = healthcheck 61 self.mz_service = mz_service 62 self.platform = platform 63 self.deploy_generation = deploy_generation 64 self.restart = restart 65 self.force_migrations = force_migrations 66 self.publish = publish 67 self.scenario = scenario
def
execute(self, e: materialize.checks.executors.Executor) -> None:
69 def execute(self, e: Executor) -> None: 70 c = e.mzcompose_composition() 71 72 image = ( 73 f"{image_registry()}/materialized:{self.tag}" 74 if self.tag is not None 75 else None 76 ) 77 print(f"Starting Mz using image {image}, mz_service {self.mz_service}") 78 79 listeners_config_path = ( 80 f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive.json" 81 ) 82 83 if not self.tag or self.tag >= MzVersion.parse_mz("v0.158.0-dev"): 84 listeners_config_path = ( 85 f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive_sasl.json" 86 ) 87 88 mz = Materialized( 89 name=self.mz_service, 90 image=image, 91 external_metadata_store=True, 92 external_blob_store=True, 93 blob_store_is_azure=self.scenario.features.azurite_enabled(), 94 environment_extra=self.environment_extra, 95 system_parameter_defaults=self.system_parameter_defaults, 96 additional_system_parameter_defaults=self.additional_system_parameter_defaults, 97 system_parameter_version=self.system_parameter_version, 98 sanity_restart=False, 99 platform=self.platform, 100 healthcheck=self.healthcheck, 101 deploy_generation=self.deploy_generation, 102 restart=self.restart, 103 force_migrations=self.force_migrations, 104 publish=self.publish, 105 default_replication_factor=2, 106 support_external_clusterd=True, 107 listeners_config_path=listeners_config_path, 108 ) 109 110 # Don't fail since we are careful to explicitly kill and collect logs 111 # of the services thus started 112 with c.override(mz, fail_on_new_service=False): 113 c.up("materialized" if self.mz_service is None else self.mz_service) 114 115 # If we start up Materialize with a deploy-generation , then it 116 # stays in a stuck state when the preflight-check is completed. So 117 # we can't connect to it yet to run any commands. 118 if self.deploy_generation: 119 return 120 121 # This should live in ssh.py and alter_connection.py, but accessing the 122 # ssh bastion host from inside a check is not possible currently. 123 for i in range(4): 124 ssh_tunnel_name = f"ssh_tunnel_{i}" 125 setup_default_ssh_test_connection( 126 c, ssh_tunnel_name, mz_service=self.mz_service 127 ) 128 129 mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service)) 130 if self.tag: 131 assert ( 132 self.tag == mz_version 133 ), f"Materialize version mismatch, expected {self.tag}, but got {mz_version}" 134 else: 135 version_cargo = MzVersion.parse_cargo() 136 assert ( 137 version_cargo == mz_version 138 ), f"Materialize version mismatch, expected {version_cargo}, but got {mz_version}" 139 140 e.current_mz_version = mz_version
Inherited Members
143class ConfigureMz(MzcomposeAction): 144 def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None: 145 self.handle: Any | None = None 146 self.mz_service = mz_service 147 self.scenario = scenario 148 149 def execute(self, e: Executor) -> None: 150 input = dedent( 151 """ 152 # Run any query to have the materialize user implicitly created if 153 # it didn't exist yet. Required for the GRANT later. 154 > SELECT 1; 155 1 156 """ 157 ) 158 159 system_settings = { 160 "ALTER SYSTEM SET max_tables = 1000;", 161 "ALTER SYSTEM SET max_sinks = 1000;", 162 "ALTER SYSTEM SET max_sources = 1000;", 163 "ALTER SYSTEM SET max_materialized_views = 1000;", 164 "ALTER SYSTEM SET max_objects_per_schema = 1000;", 165 "ALTER SYSTEM SET max_secrets = 1000;", 166 "ALTER SYSTEM SET max_clusters = 1000;", 167 } 168 169 # Since we already test with RBAC enabled, we have to give materialize 170 # user the relevant attributes so the existing tests keep working. 171 system_settings.add("GRANT ALL PRIVILEGES ON SYSTEM TO materialize;") 172 173 # do not enable this by default for all checks 174 system_settings.add("ALTER SYSTEM SET enable_rbac_checks TO false;") 175 176 # Since we already test with RBAC enabled, we have to give materialize 177 # user the relevant privileges so the existing tests keep working. 178 system_settings.add("GRANT CREATE ON DATABASE materialize TO materialize;") 179 system_settings.add("GRANT CREATE ON SCHEMA materialize.public TO materialize;") 180 system_settings.add("GRANT CREATE ON CLUSTER quickstart TO materialize;") 181 182 system_settings = system_settings - e.system_settings 183 184 if system_settings: 185 input += ( 186 "$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}\n" 187 + "\n".join(system_settings) 188 ) 189 190 kafka_broker = "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT" 191 input += dedent( 192 f""" 193 > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA {kafka_broker} 194 195 > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; 196 """ 197 ) 198 199 self.handle = e.testdrive(input=input, mz_service=self.mz_service) 200 e.system_settings.update(system_settings) 201 202 def join(self, e: Executor) -> None: 203 e.join(self.handle)
def
execute(self, e: materialize.checks.executors.Executor) -> None:
149 def execute(self, e: Executor) -> None: 150 input = dedent( 151 """ 152 # Run any query to have the materialize user implicitly created if 153 # it didn't exist yet. Required for the GRANT later. 154 > SELECT 1; 155 1 156 """ 157 ) 158 159 system_settings = { 160 "ALTER SYSTEM SET max_tables = 1000;", 161 "ALTER SYSTEM SET max_sinks = 1000;", 162 "ALTER SYSTEM SET max_sources = 1000;", 163 "ALTER SYSTEM SET max_materialized_views = 1000;", 164 "ALTER SYSTEM SET max_objects_per_schema = 1000;", 165 "ALTER SYSTEM SET max_secrets = 1000;", 166 "ALTER SYSTEM SET max_clusters = 1000;", 167 } 168 169 # Since we already test with RBAC enabled, we have to give materialize 170 # user the relevant attributes so the existing tests keep working. 171 system_settings.add("GRANT ALL PRIVILEGES ON SYSTEM TO materialize;") 172 173 # do not enable this by default for all checks 174 system_settings.add("ALTER SYSTEM SET enable_rbac_checks TO false;") 175 176 # Since we already test with RBAC enabled, we have to give materialize 177 # user the relevant privileges so the existing tests keep working. 178 system_settings.add("GRANT CREATE ON DATABASE materialize TO materialize;") 179 system_settings.add("GRANT CREATE ON SCHEMA materialize.public TO materialize;") 180 system_settings.add("GRANT CREATE ON CLUSTER quickstart TO materialize;") 181 182 system_settings = system_settings - e.system_settings 183 184 if system_settings: 185 input += ( 186 "$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}\n" 187 + "\n".join(system_settings) 188 ) 189 190 kafka_broker = "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT" 191 input += dedent( 192 f""" 193 > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA {kafka_broker} 194 195 > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; 196 """ 197 ) 198 199 self.handle = e.testdrive(input=input, mz_service=self.mz_service) 200 e.system_settings.update(system_settings)
206class SetupSqlServerTesting(MzcomposeAction): 207 def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None: 208 self.handle: Any | None = None 209 self.mz_service = mz_service 210 self.scenario = scenario 211 212 def execute(self, e: Executor) -> None: 213 with open(MZ_ROOT / "test" / "sql-server-cdc" / "setup" / "setup.td") as f: 214 self.handle = e.testdrive(input=f.read(), mz_service=self.mz_service) 215 216 def join(self, e: Executor) -> None: 217 e.join(self.handle)
SetupSqlServerTesting( scenario: materialize.checks.scenarios.Scenario, mz_service: str | None = None)
220class KillMz(MzcomposeAction): 221 def __init__( 222 self, mz_service: str = "materialized", capture_logs: bool = False 223 ) -> None: 224 self.mz_service = mz_service 225 self.capture_logs = capture_logs 226 227 def execute(self, e: Executor) -> None: 228 c = e.mzcompose_composition() 229 230 # Don't fail since we are careful to explicitly kill and collect logs 231 # of the services thus started 232 with c.override(Materialized(name=self.mz_service), fail_on_new_service=False): 233 c.kill(self.mz_service, wait=True) 234 235 if self.capture_logs: 236 c.capture_logs(self.mz_service)
def
execute(self, e: materialize.checks.executors.Executor) -> None:
227 def execute(self, e: Executor) -> None: 228 c = e.mzcompose_composition() 229 230 # Don't fail since we are careful to explicitly kill and collect logs 231 # of the services thus started 232 with c.override(Materialized(name=self.mz_service), fail_on_new_service=False): 233 c.kill(self.mz_service, wait=True) 234 235 if self.capture_logs: 236 c.capture_logs(self.mz_service)
Inherited Members
239class Stop(MzcomposeAction): 240 def __init__(self, service: str = "materialized") -> None: 241 self.service = service 242 243 def execute(self, e: Executor) -> None: 244 c = e.mzcompose_composition() 245 c.stop(self.service, wait=True)
Inherited Members
248class Down(MzcomposeAction): 249 def execute(self, e: Executor) -> None: 250 c = e.mzcompose_composition() 251 c.down()
Inherited Members
254class UseClusterdCompute(MzcomposeAction): 255 def __init__(self, scenario: "Scenario") -> None: 256 self.base_version = scenario.base_version() 257 258 def execute(self, e: Executor) -> None: 259 c = e.mzcompose_composition() 260 261 c.sql( 262 "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = on;", 263 port=6877, 264 user="mz_system", 265 ) 266 c.sql( 267 """ 268 ALTER CLUSTER quickstart SET (MANAGED = false); 269 DROP CLUSTER REPLICA quickstart.r1; 270 CREATE CLUSTER REPLICA quickstart.r1 271 STORAGECTL ADDRESSES ['clusterd_compute_1:2100'], 272 STORAGE ADDRESSES ['clusterd_compute_1:2103'], 273 COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'], 274 COMPUTE ADDRESSES ['clusterd_compute_1:2102'], 275 WORKERS 1; 276 """, 277 port=6877, 278 user="mz_system", 279 )
def
execute(self, e: materialize.checks.executors.Executor) -> None:
258 def execute(self, e: Executor) -> None: 259 c = e.mzcompose_composition() 260 261 c.sql( 262 "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = on;", 263 port=6877, 264 user="mz_system", 265 ) 266 c.sql( 267 """ 268 ALTER CLUSTER quickstart SET (MANAGED = false); 269 DROP CLUSTER REPLICA quickstart.r1; 270 CREATE CLUSTER REPLICA quickstart.r1 271 STORAGECTL ADDRESSES ['clusterd_compute_1:2100'], 272 STORAGE ADDRESSES ['clusterd_compute_1:2103'], 273 COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'], 274 COMPUTE ADDRESSES ['clusterd_compute_1:2102'], 275 WORKERS 1; 276 """, 277 port=6877, 278 user="mz_system", 279 )
Inherited Members
282class KillClusterdCompute(MzcomposeAction): 283 def __init__(self, capture_logs: bool = False) -> None: 284 self.capture_logs = capture_logs 285 286 def execute(self, e: Executor) -> None: 287 c = e.mzcompose_composition() 288 with c.override(Clusterd(name="clusterd_compute_1")): 289 c.kill("clusterd_compute_1") 290 291 if self.capture_logs: 292 c.capture_logs("clusterd_compute_1")
Inherited Members
295class StartClusterdCompute(MzcomposeAction): 296 def __init__(self, tag: MzVersion | None = None) -> None: 297 self.tag = tag 298 299 def execute(self, e: Executor) -> None: 300 c = e.mzcompose_composition() 301 302 clusterd = Clusterd(name="clusterd_compute_1") 303 if self.tag: 304 clusterd = Clusterd( 305 name="clusterd_compute_1", 306 image=f"materialize/clusterd:{self.tag}", 307 ) 308 print(f"Starting Compute using image {clusterd.config.get('image')}") 309 310 with c.override(clusterd): 311 c.up("clusterd_compute_1")
def
execute(self, e: materialize.checks.executors.Executor) -> None:
299 def execute(self, e: Executor) -> None: 300 c = e.mzcompose_composition() 301 302 clusterd = Clusterd(name="clusterd_compute_1") 303 if self.tag: 304 clusterd = Clusterd( 305 name="clusterd_compute_1", 306 image=f"materialize/clusterd:{self.tag}", 307 ) 308 print(f"Starting Compute using image {clusterd.config.get('image')}") 309 310 with c.override(clusterd): 311 c.up("clusterd_compute_1")
Inherited Members
314class RestartRedpandaDebezium(MzcomposeAction): 315 """Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together.""" 316 317 def execute(self, e: Executor) -> None: 318 c = e.mzcompose_composition() 319 320 for service in ["redpanda", "debezium"]: 321 c.kill(service) 322 c.up(service)
Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together.
Inherited Members
325class RestartCockroach(MzcomposeAction): 326 def execute(self, e: Executor) -> None: 327 c = e.mzcompose_composition() 328 329 c.kill(c.metadata_store()) 330 c.up(c.metadata_store())
Inherited Members
333class RestartSourcePostgres(MzcomposeAction): 334 def execute(self, e: Executor) -> None: 335 c = e.mzcompose_composition() 336 337 c.kill("postgres") 338 c.up("postgres")
Inherited Members
341class KillClusterdStorage(MzcomposeAction): 342 def execute(self, e: Executor) -> None: 343 c = e.mzcompose_composition() 344 345 # Depending on the workload, clusterd may not be running, hence the || true 346 c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")
Inherited Members
349class DropCreateDefaultReplica(MzcomposeAction): 350 def __init__(self, scenario: "Scenario") -> None: 351 self.base_version = scenario.base_version() 352 353 def execute(self, e: Executor) -> None: 354 c = e.mzcompose_composition() 355 356 c.sql( 357 """ 358 ALTER CLUSTER quickstart SET (MANAGED = false); 359 DROP CLUSTER REPLICA quickstart.r1; 360 CREATE CLUSTER REPLICA quickstart.r1 SIZE 'scale=1,workers=1'; 361 """, 362 port=6877, 363 user="mz_system", 364 )
def
execute(self, e: materialize.checks.executors.Executor) -> None:
353 def execute(self, e: Executor) -> None: 354 c = e.mzcompose_composition() 355 356 c.sql( 357 """ 358 ALTER CLUSTER quickstart SET (MANAGED = false); 359 DROP CLUSTER REPLICA quickstart.r1; 360 CREATE CLUSTER REPLICA quickstart.r1 SIZE 'scale=1,workers=1'; 361 """, 362 port=6877, 363 user="mz_system", 364 )
Inherited Members
367class WaitReadyMz(MzcomposeAction): 368 """Wait until environmentd is ready, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#get-apileaderstatus""" 369 370 def __init__(self, mz_service: str = "materialized") -> None: 371 self.mz_service = mz_service 372 373 def execute(self, e: Executor) -> None: 374 e.mzcompose_composition().await_mz_deployment_status( 375 DeploymentStatus.READY_TO_PROMOTE, self.mz_service 376 )
Wait until environmentd is ready, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#get-apileaderstatus
Inherited Members
379class PromoteMz(MzcomposeAction): 380 """Promote environmentd to leader, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#post-apileaderpromote""" 381 382 def __init__(self, mz_service: str = "materialized") -> None: 383 self.mz_service = mz_service 384 385 def execute(self, e: Executor) -> None: 386 c = e.mzcompose_composition() 387 388 result = json.loads( 389 c.exec( 390 self.mz_service, 391 "curl", 392 "-s", 393 "-X", 394 "POST", 395 "http://127.0.0.1:6878/api/leader/promote", 396 capture=True, 397 ).stdout 398 ) 399 assert result["result"] == "Success", f"Unexpected result {result}" 400 401 # Wait until new Materialize is ready to handle queries 402 c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, self.mz_service) 403 404 mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service)) 405 e.current_mz_version = mz_version
Promote environmentd to leader, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#post-apileaderpromote
def
execute(self, e: materialize.checks.executors.Executor) -> None:
385 def execute(self, e: Executor) -> None: 386 c = e.mzcompose_composition() 387 388 result = json.loads( 389 c.exec( 390 self.mz_service, 391 "curl", 392 "-s", 393 "-X", 394 "POST", 395 "http://127.0.0.1:6878/api/leader/promote", 396 capture=True, 397 ).stdout 398 ) 399 assert result["result"] == "Success", f"Unexpected result {result}" 400 401 # Wait until new Materialize is ready to handle queries 402 c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, self.mz_service) 403 404 mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service)) 405 e.current_mz_version = mz_version
Inherited Members
408class SystemVarChange(MzcomposeAction): 409 """Changes a system var.""" 410 411 def __init__(self, name: str, value: str): 412 self.name = name 413 self.value = value 414 415 def execute(self, e: Executor) -> None: 416 c = e.mzcompose_composition() 417 418 c.sql( 419 f"ALTER SYSTEM SET {self.name} = {self.value};", 420 port=6877, 421 user="mz_system", 422 )
Changes a system var.