misc.python.materialize.zippy.mz_actions
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10 11import os 12 13from materialize.mzcompose.composition import Composition 14from materialize.mzcompose.services.materialized import ( 15 LEADER_STATUS_HEALTHCHECK, 16 DeploymentStatus, 17 Materialized, 18) 19from materialize.zippy.balancerd_capabilities import BalancerdIsRunning 20from materialize.zippy.blob_store_capabilities import BlobStoreIsRunning 21from materialize.zippy.crdb_capabilities import CockroachIsRunning 22from materialize.zippy.framework import ( 23 Action, 24 ActionFactory, 25 Capabilities, 26 Capability, 27 Mz0dtDeployBaseAction, 28 State, 29) 30from materialize.zippy.mz_capabilities import MzIsRunning 31from materialize.zippy.view_capabilities import ViewExists 32 33 34class MzStartParameterized(ActionFactory): 35 """Starts a Mz instance with custom paramters.""" 36 37 @classmethod 38 def requires(cls) -> set[type[Capability]]: 39 return {CockroachIsRunning, BlobStoreIsRunning} 40 41 @classmethod 42 def incompatible_with(cls) -> set[type[Capability]]: 43 return {MzIsRunning} 44 45 def __init__( 46 self, additional_system_parameter_defaults: dict[str, str] = {} 47 ) -> None: 48 self.additional_system_parameter_defaults = additional_system_parameter_defaults 49 50 def new(self, capabilities: Capabilities) -> list[Action]: 51 return [ 52 MzStart( 53 capabilities=capabilities, 54 additional_system_parameter_defaults=self.additional_system_parameter_defaults, 55 ) 56 ] 57 58 59class MzStart(Action): 60 """Starts a Mz instance (all components are running in the same container).""" 61 62 @classmethod 63 def requires(cls) -> set[type[Capability]]: 64 return {CockroachIsRunning, BlobStoreIsRunning} 65 66 @classmethod 67 def incompatible_with(cls) -> set[type[Capability]]: 68 return {MzIsRunning} 69 70 def __init__( 71 self, 72 capabilities: Capabilities, 73 additional_system_parameter_defaults: dict[str, str] = {}, 74 ) -> None: 75 if additional_system_parameter_defaults: 76 self.additional_system_parameter_defaults = ( 77 additional_system_parameter_defaults 78 ) 79 else: 80 self.additional_system_parameter_defaults = {} 81 system_parameter_default = os.getenv("CI_MZ_SYSTEM_PARAMETER_DEFAULT", "") 82 if system_parameter_default: 83 for val in system_parameter_default.split(";"): 84 x = val.split("=", maxsplit=1) 85 assert ( 86 len(x) == 2 87 ), f"CI_MZ_SYSTEM_PARAMETER_DEFAULT '{val}' should be the format <key>=<val>" 88 self.additional_system_parameter_defaults[x[0]] = x[1] 89 90 super().__init__(capabilities) 91 92 def run(self, c: Composition, state: State) -> None: 93 print( 94 f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}" 95 ) 96 97 with c.override( 98 Materialized( 99 name=state.mz_service, 100 external_blob_store=True, 101 blob_store_is_azure=c.blob_store() == "azurite", 102 external_metadata_store=True, 103 deploy_generation=state.deploy_generation, 104 system_parameter_defaults=state.system_parameter_defaults, 105 sanity_restart=False, 106 restart="on-failure", 107 additional_system_parameter_defaults=self.additional_system_parameter_defaults, 108 metadata_store="cockroach", 109 default_replication_factor=2, 110 ) 111 ): 112 c.up(state.mz_service) 113 114 for config_param in [ 115 "max_tables", 116 "max_sources", 117 "max_objects_per_schema", 118 "max_materialized_views", 119 "max_sinks", 120 ]: 121 c.sql( 122 f"ALTER SYSTEM SET {config_param} TO 1000", 123 user="mz_system", 124 port=6877, 125 print_statement=False, 126 service=state.mz_service, 127 ) 128 129 c.sql( 130 """ 131 ALTER CLUSTER quickstart SET (MANAGED = false); 132 """, 133 user="mz_system", 134 port=6877, 135 service=state.mz_service, 136 ) 137 138 # Make sure all eligible LIMIT queries use the PeekPersist optimization 139 c.sql( 140 "ALTER SYSTEM SET persist_fast_path_limit = 1000000000", 141 user="mz_system", 142 port=6877, 143 service=state.mz_service, 144 ) 145 146 def provides(self) -> list[Capability]: 147 return [MzIsRunning()] 148 149 150class MzStop(Action): 151 """Stops the entire Mz instance (all components are running in the same container).""" 152 153 @classmethod 154 def requires(cls) -> set[type[Capability]]: 155 # Technically speaking, we do not need balancerd to be up in order to kill Mz 156 # However, without this protection we frequently end up in a situation where 157 # both are down and Zippy enters a prolonged period of restarting one or the 158 # other and no other useful work can be performed in the meantime. 159 return {MzIsRunning, BalancerdIsRunning} 160 161 def run(self, c: Composition, state: State) -> None: 162 c.kill(state.mz_service) 163 164 def withholds(self) -> set[type[Capability]]: 165 return {MzIsRunning} 166 167 168class MzRestart(Action): 169 """Restarts the entire Mz instance (all components are running in the same container).""" 170 171 @classmethod 172 def requires(cls) -> set[type[Capability]]: 173 return {MzIsRunning} 174 175 def run(self, c: Composition, state: State) -> None: 176 with c.override( 177 Materialized( 178 name=state.mz_service, 179 external_blob_store=True, 180 blob_store_is_azure=c.blob_store() == "azurite", 181 external_metadata_store=True, 182 deploy_generation=state.deploy_generation, 183 system_parameter_defaults=state.system_parameter_defaults, 184 sanity_restart=False, 185 restart="on-failure", 186 metadata_store="cockroach", 187 default_replication_factor=2, 188 ) 189 ): 190 c.kill(state.mz_service) 191 c.up(state.mz_service) 192 193 194class Mz0dtDeploy(Mz0dtDeployBaseAction): 195 """Switches Mz to a new deployment using 0dt.""" 196 197 @classmethod 198 def requires(cls) -> set[type[Capability]]: 199 return {MzIsRunning} 200 201 def run(self, c: Composition, state: State) -> None: 202 state.deploy_generation += 1 203 204 state.mz_service = ( 205 "materialized" if state.deploy_generation % 2 == 0 else "materialized2" 206 ) 207 208 print(f"Deploying generation {state.deploy_generation} on {state.mz_service}") 209 210 with c.override( 211 Materialized( 212 name=state.mz_service, 213 external_blob_store=True, 214 blob_store_is_azure=c.blob_store() == "azurite", 215 external_metadata_store=True, 216 deploy_generation=state.deploy_generation, 217 system_parameter_defaults=state.system_parameter_defaults, 218 sanity_restart=False, 219 restart="on-failure", 220 healthcheck=LEADER_STATUS_HEALTHCHECK, 221 metadata_store="cockroach", 222 default_replication_factor=2, 223 ), 224 ): 225 c.up(state.mz_service, detach=True) 226 c.await_mz_deployment_status( 227 DeploymentStatus.READY_TO_PROMOTE, state.mz_service 228 ) 229 c.promote_mz(state.mz_service) 230 c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, state.mz_service) 231 c.stop( 232 ( 233 "materialized2" 234 if state.mz_service == "materialized" 235 else "materialized" 236 ), 237 wait=True, 238 ) 239 240 241class KillClusterd(Action): 242 """Kills the clusterd processes in the environmentd container. The process orchestrator will restart them.""" 243 244 @classmethod 245 def requires(cls) -> set[type[Capability]]: 246 return {MzIsRunning, ViewExists} 247 248 def run(self, c: Composition, state: State) -> None: 249 # Depending on the workload, clusterd may not be running, hence the || true 250 c.exec(state.mz_service, "bash", "-c", "kill -9 `pidof clusterd` || true")
35class MzStartParameterized(ActionFactory): 36 """Starts a Mz instance with custom paramters.""" 37 38 @classmethod 39 def requires(cls) -> set[type[Capability]]: 40 return {CockroachIsRunning, BlobStoreIsRunning} 41 42 @classmethod 43 def incompatible_with(cls) -> set[type[Capability]]: 44 return {MzIsRunning} 45 46 def __init__( 47 self, additional_system_parameter_defaults: dict[str, str] = {} 48 ) -> None: 49 self.additional_system_parameter_defaults = additional_system_parameter_defaults 50 51 def new(self, capabilities: Capabilities) -> list[Action]: 52 return [ 53 MzStart( 54 capabilities=capabilities, 55 additional_system_parameter_defaults=self.additional_system_parameter_defaults, 56 ) 57 ]
Starts a Mz instance with custom paramters.
38 @classmethod 39 def requires(cls) -> set[type[Capability]]: 40 return {CockroachIsRunning, BlobStoreIsRunning}
Compute the capability classes that this Action Factory requires.
The capability classes that this action is not compatible with.
60class MzStart(Action): 61 """Starts a Mz instance (all components are running in the same container).""" 62 63 @classmethod 64 def requires(cls) -> set[type[Capability]]: 65 return {CockroachIsRunning, BlobStoreIsRunning} 66 67 @classmethod 68 def incompatible_with(cls) -> set[type[Capability]]: 69 return {MzIsRunning} 70 71 def __init__( 72 self, 73 capabilities: Capabilities, 74 additional_system_parameter_defaults: dict[str, str] = {}, 75 ) -> None: 76 if additional_system_parameter_defaults: 77 self.additional_system_parameter_defaults = ( 78 additional_system_parameter_defaults 79 ) 80 else: 81 self.additional_system_parameter_defaults = {} 82 system_parameter_default = os.getenv("CI_MZ_SYSTEM_PARAMETER_DEFAULT", "") 83 if system_parameter_default: 84 for val in system_parameter_default.split(";"): 85 x = val.split("=", maxsplit=1) 86 assert ( 87 len(x) == 2 88 ), f"CI_MZ_SYSTEM_PARAMETER_DEFAULT '{val}' should be the format <key>=<val>" 89 self.additional_system_parameter_defaults[x[0]] = x[1] 90 91 super().__init__(capabilities) 92 93 def run(self, c: Composition, state: State) -> None: 94 print( 95 f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}" 96 ) 97 98 with c.override( 99 Materialized( 100 name=state.mz_service, 101 external_blob_store=True, 102 blob_store_is_azure=c.blob_store() == "azurite", 103 external_metadata_store=True, 104 deploy_generation=state.deploy_generation, 105 system_parameter_defaults=state.system_parameter_defaults, 106 sanity_restart=False, 107 restart="on-failure", 108 additional_system_parameter_defaults=self.additional_system_parameter_defaults, 109 metadata_store="cockroach", 110 default_replication_factor=2, 111 ) 112 ): 113 c.up(state.mz_service) 114 115 for config_param in [ 116 "max_tables", 117 "max_sources", 118 "max_objects_per_schema", 119 "max_materialized_views", 120 "max_sinks", 121 ]: 122 c.sql( 123 f"ALTER SYSTEM SET {config_param} TO 1000", 124 user="mz_system", 125 port=6877, 126 print_statement=False, 127 service=state.mz_service, 128 ) 129 130 c.sql( 131 """ 132 ALTER CLUSTER quickstart SET (MANAGED = false); 133 """, 134 user="mz_system", 135 port=6877, 136 service=state.mz_service, 137 ) 138 139 # Make sure all eligible LIMIT queries use the PeekPersist optimization 140 c.sql( 141 "ALTER SYSTEM SET persist_fast_path_limit = 1000000000", 142 user="mz_system", 143 port=6877, 144 service=state.mz_service, 145 ) 146 147 def provides(self) -> list[Capability]: 148 return [MzIsRunning()]
Starts a Mz instance (all components are running in the same container).
71 def __init__( 72 self, 73 capabilities: Capabilities, 74 additional_system_parameter_defaults: dict[str, str] = {}, 75 ) -> None: 76 if additional_system_parameter_defaults: 77 self.additional_system_parameter_defaults = ( 78 additional_system_parameter_defaults 79 ) 80 else: 81 self.additional_system_parameter_defaults = {} 82 system_parameter_default = os.getenv("CI_MZ_SYSTEM_PARAMETER_DEFAULT", "") 83 if system_parameter_default: 84 for val in system_parameter_default.split(";"): 85 x = val.split("=", maxsplit=1) 86 assert ( 87 len(x) == 2 88 ), f"CI_MZ_SYSTEM_PARAMETER_DEFAULT '{val}' should be the format <key>=<val>" 89 self.additional_system_parameter_defaults[x[0]] = x[1] 90 91 super().__init__(capabilities)
Construct a new action, possibly conditioning on the available capabilities.
63 @classmethod 64 def requires(cls) -> set[type[Capability]]: 65 return {CockroachIsRunning, BlobStoreIsRunning}
Compute the capability classes that this action requires.
The capability classes that this action is not compatible with.
93 def run(self, c: Composition, state: State) -> None: 94 print( 95 f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}" 96 ) 97 98 with c.override( 99 Materialized( 100 name=state.mz_service, 101 external_blob_store=True, 102 blob_store_is_azure=c.blob_store() == "azurite", 103 external_metadata_store=True, 104 deploy_generation=state.deploy_generation, 105 system_parameter_defaults=state.system_parameter_defaults, 106 sanity_restart=False, 107 restart="on-failure", 108 additional_system_parameter_defaults=self.additional_system_parameter_defaults, 109 metadata_store="cockroach", 110 default_replication_factor=2, 111 ) 112 ): 113 c.up(state.mz_service) 114 115 for config_param in [ 116 "max_tables", 117 "max_sources", 118 "max_objects_per_schema", 119 "max_materialized_views", 120 "max_sinks", 121 ]: 122 c.sql( 123 f"ALTER SYSTEM SET {config_param} TO 1000", 124 user="mz_system", 125 port=6877, 126 print_statement=False, 127 service=state.mz_service, 128 ) 129 130 c.sql( 131 """ 132 ALTER CLUSTER quickstart SET (MANAGED = false); 133 """, 134 user="mz_system", 135 port=6877, 136 service=state.mz_service, 137 ) 138 139 # Make sure all eligible LIMIT queries use the PeekPersist optimization 140 c.sql( 141 "ALTER SYSTEM SET persist_fast_path_limit = 1000000000", 142 user="mz_system", 143 port=6877, 144 service=state.mz_service, 145 )
Run this action on the provided composition.
151class MzStop(Action): 152 """Stops the entire Mz instance (all components are running in the same container).""" 153 154 @classmethod 155 def requires(cls) -> set[type[Capability]]: 156 # Technically speaking, we do not need balancerd to be up in order to kill Mz 157 # However, without this protection we frequently end up in a situation where 158 # both are down and Zippy enters a prolonged period of restarting one or the 159 # other and no other useful work can be performed in the meantime. 160 return {MzIsRunning, BalancerdIsRunning} 161 162 def run(self, c: Composition, state: State) -> None: 163 c.kill(state.mz_service) 164 165 def withholds(self) -> set[type[Capability]]: 166 return {MzIsRunning}
Stops the entire Mz instance (all components are running in the same container).
154 @classmethod 155 def requires(cls) -> set[type[Capability]]: 156 # Technically speaking, we do not need balancerd to be up in order to kill Mz 157 # However, without this protection we frequently end up in a situation where 158 # both are down and Zippy enters a prolonged period of restarting one or the 159 # other and no other useful work can be performed in the meantime. 160 return {MzIsRunning, BalancerdIsRunning}
Compute the capability classes that this action requires.
169class MzRestart(Action): 170 """Restarts the entire Mz instance (all components are running in the same container).""" 171 172 @classmethod 173 def requires(cls) -> set[type[Capability]]: 174 return {MzIsRunning} 175 176 def run(self, c: Composition, state: State) -> None: 177 with c.override( 178 Materialized( 179 name=state.mz_service, 180 external_blob_store=True, 181 blob_store_is_azure=c.blob_store() == "azurite", 182 external_metadata_store=True, 183 deploy_generation=state.deploy_generation, 184 system_parameter_defaults=state.system_parameter_defaults, 185 sanity_restart=False, 186 restart="on-failure", 187 metadata_store="cockroach", 188 default_replication_factor=2, 189 ) 190 ): 191 c.kill(state.mz_service) 192 c.up(state.mz_service)
Restarts the entire Mz instance (all components are running in the same container).
Compute the capability classes that this action requires.
176 def run(self, c: Composition, state: State) -> None: 177 with c.override( 178 Materialized( 179 name=state.mz_service, 180 external_blob_store=True, 181 blob_store_is_azure=c.blob_store() == "azurite", 182 external_metadata_store=True, 183 deploy_generation=state.deploy_generation, 184 system_parameter_defaults=state.system_parameter_defaults, 185 sanity_restart=False, 186 restart="on-failure", 187 metadata_store="cockroach", 188 default_replication_factor=2, 189 ) 190 ): 191 c.kill(state.mz_service) 192 c.up(state.mz_service)
Run this action on the provided composition.
195class Mz0dtDeploy(Mz0dtDeployBaseAction): 196 """Switches Mz to a new deployment using 0dt.""" 197 198 @classmethod 199 def requires(cls) -> set[type[Capability]]: 200 return {MzIsRunning} 201 202 def run(self, c: Composition, state: State) -> None: 203 state.deploy_generation += 1 204 205 state.mz_service = ( 206 "materialized" if state.deploy_generation % 2 == 0 else "materialized2" 207 ) 208 209 print(f"Deploying generation {state.deploy_generation} on {state.mz_service}") 210 211 with c.override( 212 Materialized( 213 name=state.mz_service, 214 external_blob_store=True, 215 blob_store_is_azure=c.blob_store() == "azurite", 216 external_metadata_store=True, 217 deploy_generation=state.deploy_generation, 218 system_parameter_defaults=state.system_parameter_defaults, 219 sanity_restart=False, 220 restart="on-failure", 221 healthcheck=LEADER_STATUS_HEALTHCHECK, 222 metadata_store="cockroach", 223 default_replication_factor=2, 224 ), 225 ): 226 c.up(state.mz_service, detach=True) 227 c.await_mz_deployment_status( 228 DeploymentStatus.READY_TO_PROMOTE, state.mz_service 229 ) 230 c.promote_mz(state.mz_service) 231 c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, state.mz_service) 232 c.stop( 233 ( 234 "materialized2" 235 if state.mz_service == "materialized" 236 else "materialized" 237 ), 238 wait=True, 239 )
Switches Mz to a new deployment using 0dt.
Compute the capability classes that this action requires.
202 def run(self, c: Composition, state: State) -> None: 203 state.deploy_generation += 1 204 205 state.mz_service = ( 206 "materialized" if state.deploy_generation % 2 == 0 else "materialized2" 207 ) 208 209 print(f"Deploying generation {state.deploy_generation} on {state.mz_service}") 210 211 with c.override( 212 Materialized( 213 name=state.mz_service, 214 external_blob_store=True, 215 blob_store_is_azure=c.blob_store() == "azurite", 216 external_metadata_store=True, 217 deploy_generation=state.deploy_generation, 218 system_parameter_defaults=state.system_parameter_defaults, 219 sanity_restart=False, 220 restart="on-failure", 221 healthcheck=LEADER_STATUS_HEALTHCHECK, 222 metadata_store="cockroach", 223 default_replication_factor=2, 224 ), 225 ): 226 c.up(state.mz_service, detach=True) 227 c.await_mz_deployment_status( 228 DeploymentStatus.READY_TO_PROMOTE, state.mz_service 229 ) 230 c.promote_mz(state.mz_service) 231 c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, state.mz_service) 232 c.stop( 233 ( 234 "materialized2" 235 if state.mz_service == "materialized" 236 else "materialized" 237 ), 238 wait=True, 239 )
Run this action on the provided composition.
242class KillClusterd(Action): 243 """Kills the clusterd processes in the environmentd container. The process orchestrator will restart them.""" 244 245 @classmethod 246 def requires(cls) -> set[type[Capability]]: 247 return {MzIsRunning, ViewExists} 248 249 def run(self, c: Composition, state: State) -> None: 250 # Depending on the workload, clusterd may not be running, hence the || true 251 c.exec(state.mz_service, "bash", "-c", "kill -9 `pidof clusterd` || true")
Kills the clusterd processes in the environmentd container. The process orchestrator will restart them.
245 @classmethod 246 def requires(cls) -> set[type[Capability]]: 247 return {MzIsRunning, ViewExists}
Compute the capability classes that this action requires.
249 def run(self, c: Composition, state: State) -> None: 250 # Depending on the workload, clusterd may not be running, hence the || true 251 c.exec(state.mz_service, "bash", "-c", "kill -9 `pidof clusterd` || true")
Run this action on the provided composition.