misc.python.materialize.mz_0dt_upgrader
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10from collections.abc import Callable 11from dataclasses import dataclass 12from random import Random 13from typing import TypedDict 14 15from materialize.mz_version import MzVersion 16from materialize.mzcompose import get_default_system_parameters 17from materialize.mzcompose.composition import Composition 18from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized 19from materialize.mzcompose.services.metadata_store import METADATA_STORE 20 21 22class MaterializedUpgradeArgs(TypedDict): 23 """Arguments for the Materialized service constructor required for 0dt upgrades.""" 24 25 name: str 26 image: str | None 27 deploy_generation: int 28 system_parameter_defaults: dict[str, str] 29 external_metadata_store: bool 30 restart: str 31 32 33@dataclass 34class UpgradeStep: 35 """Represents a single upgrade step with its service name and action.""" 36 37 new_service: Materialized 38 previous_service: Materialized 39 upgrade: Callable[[], None] 40 41 42def generate_materialized_upgrade_args( 43 versions: list[MzVersion | None], 44) -> list[MaterializedUpgradeArgs]: 45 """ 46 Constructs a list of required Materialized arguments for 0dt upgrades. 47 Requires there to be an mz_1 and mz_2 service already in the composition. 48 """ 49 # We use the first version to get the system parameters since the defaults for 50 # newer versions include cutting edge features than can break backwards compatibility. 51 # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features. 52 system_parameter_defaults = get_default_system_parameters(versions[0]) 53 54 return [ 55 MaterializedUpgradeArgs( 56 image=f"materialize/materialized:{version}" if version else None, 57 # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost. 58 name=f"mz_{(i % 2) + 1}", 59 # Generation number for the service. Required to start services in read only mode. 60 deploy_generation=i, 61 system_parameter_defaults=system_parameter_defaults, 62 # To share the same metadata store between services 63 external_metadata_store=True, 64 # To restart when container exits due to promotion 65 restart="on-failure", 66 ) 67 for i, version in enumerate(versions) 68 ] 69 70 71def generate_random_upgrade_path( 72 versions: list[MzVersion], 73 rng: Random | None = None, 74 max_versions: int = 6, 75) -> list[MzVersion]: 76 """ 77 Generates a random upgrade path between the given versions. 78 """ 79 selected_versions = [] 80 81 rng = rng or Random() 82 # For each version in the input list, randomly select it with a 1/3 chance. 83 for v in versions: 84 if rng.random() < 0.33: 85 selected_versions.append(v) 86 87 # Always include at least one version to avoid empty paths. 88 if len(selected_versions) == 0: 89 selected_versions.append(rng.choice(versions)) 90 91 # Cap the number of versions to keep test runtime bounded. 92 if len(selected_versions) > max_versions: 93 selected_versions = sorted(rng.sample(selected_versions, max_versions)) 94 95 return selected_versions 96 97 98class Materialized0dtUpgrader: 99 """ 100 Manages a sequence of Materialized service upgrades using zero-downtime deployments. 101 102 Args: 103 materialized_services: List of Materialized instances representing each upgrade step 104 """ 105 106 def __init__(self, c: Composition, materialized_services: list[Materialized]): 107 self.materialized_services = materialized_services 108 self.c = c 109 110 def create_upgrade_steps_list(self) -> list[UpgradeStep]: 111 """ 112 Returns a list of upgrade step actions from the second service onward. 113 114 Each step is a closure that, when called, will perform 115 the upgrade step to the corresponding service. 116 """ 117 118 def create_upgrade_action( 119 current_service: Materialized, 120 previous_service: Materialized, 121 ): 122 def upgrade() -> None: 123 with self.c.override(current_service): 124 current_service_image = ( 125 current_service.config.get("image") or "current" 126 ) 127 previous_service_image = previous_service.config.get("image") 128 129 print(f"Bringing up {current_service_image}") 130 self.c.up(current_service.name) 131 print(f"Awaiting promotion of {current_service_image}") 132 self.c.await_mz_deployment_status( 133 DeploymentStatus.READY_TO_PROMOTE, current_service.name 134 ) 135 self.c.promote_mz(current_service.name) 136 print(f"Awaiting leader status of {current_service_image}") 137 self.c.await_mz_deployment_status( 138 DeploymentStatus.IS_LEADER, current_service.name 139 ) 140 141 print(f"Killing {previous_service_image}") 142 self.c.kill(previous_service.name, wait=True) 143 144 return upgrade 145 146 services = self.materialized_services 147 steps = [] 148 for idx in range(1, len(services)): 149 current_service = services[idx] 150 previous_service = services[idx - 1] 151 152 steps.append( 153 UpgradeStep( 154 new_service=current_service, 155 previous_service=previous_service, 156 upgrade=create_upgrade_action(current_service, previous_service), 157 ) 158 ) 159 return steps 160 161 def initialize(self) -> tuple[Materialized, list[UpgradeStep]]: 162 """ 163 Initialize the with the first service. Returns a list where 164 each step is a closure that, when called, will perform the upgrade step to the corresponding service. 165 """ 166 first_service = self.materialized_services[0] 167 with self.c.override(first_service): 168 print(f"Bringing up {first_service.name}") 169 self.c.up(first_service.name) 170 171 return first_service, self.create_upgrade_steps_list() 172 173 def print_upgrade_path(self) -> None: 174 """ 175 Print the upgrade steps. 176 """ 177 178 def image_to_string(image: str | None) -> str: 179 return "current" if image is None else image.split(":")[-1] 180 181 print( 182 f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}" 183 ) 184 185 def cleanup(self) -> None: 186 """ 187 Cleanup after upgrade. 188 """ 189 print("Cleaning up upgrade path") 190 # Ensure all services are killed and removed 191 self.c.kill( 192 *[service.name for service in self.materialized_services], wait=True 193 ) 194 self.c.rm( 195 *[service.name for service in self.materialized_services], 196 destroy_volumes=True, 197 ) 198 self.c.kill(METADATA_STORE, wait=True) 199 self.c.rm( 200 METADATA_STORE, 201 destroy_volumes=True, 202 )
23class MaterializedUpgradeArgs(TypedDict): 24 """Arguments for the Materialized service constructor required for 0dt upgrades.""" 25 26 name: str 27 image: str | None 28 deploy_generation: int 29 system_parameter_defaults: dict[str, str] 30 external_metadata_store: bool 31 restart: str
Arguments for the Materialized service constructor required for 0dt upgrades.
34@dataclass 35class UpgradeStep: 36 """Represents a single upgrade step with its service name and action.""" 37 38 new_service: Materialized 39 previous_service: Materialized 40 upgrade: Callable[[], None]
Represents a single upgrade step with its service name and action.
43def generate_materialized_upgrade_args( 44 versions: list[MzVersion | None], 45) -> list[MaterializedUpgradeArgs]: 46 """ 47 Constructs a list of required Materialized arguments for 0dt upgrades. 48 Requires there to be an mz_1 and mz_2 service already in the composition. 49 """ 50 # We use the first version to get the system parameters since the defaults for 51 # newer versions include cutting edge features than can break backwards compatibility. 52 # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features. 53 system_parameter_defaults = get_default_system_parameters(versions[0]) 54 55 return [ 56 MaterializedUpgradeArgs( 57 image=f"materialize/materialized:{version}" if version else None, 58 # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost. 59 name=f"mz_{(i % 2) + 1}", 60 # Generation number for the service. Required to start services in read only mode. 61 deploy_generation=i, 62 system_parameter_defaults=system_parameter_defaults, 63 # To share the same metadata store between services 64 external_metadata_store=True, 65 # To restart when container exits due to promotion 66 restart="on-failure", 67 ) 68 for i, version in enumerate(versions) 69 ]
Constructs a list of required Materialized arguments for 0dt upgrades. Requires there to be an mz_1 and mz_2 service already in the composition.
72def generate_random_upgrade_path( 73 versions: list[MzVersion], 74 rng: Random | None = None, 75 max_versions: int = 6, 76) -> list[MzVersion]: 77 """ 78 Generates a random upgrade path between the given versions. 79 """ 80 selected_versions = [] 81 82 rng = rng or Random() 83 # For each version in the input list, randomly select it with a 1/3 chance. 84 for v in versions: 85 if rng.random() < 0.33: 86 selected_versions.append(v) 87 88 # Always include at least one version to avoid empty paths. 89 if len(selected_versions) == 0: 90 selected_versions.append(rng.choice(versions)) 91 92 # Cap the number of versions to keep test runtime bounded. 93 if len(selected_versions) > max_versions: 94 selected_versions = sorted(rng.sample(selected_versions, max_versions)) 95 96 return selected_versions
Generates a random upgrade path between the given versions.
99class Materialized0dtUpgrader: 100 """ 101 Manages a sequence of Materialized service upgrades using zero-downtime deployments. 102 103 Args: 104 materialized_services: List of Materialized instances representing each upgrade step 105 """ 106 107 def __init__(self, c: Composition, materialized_services: list[Materialized]): 108 self.materialized_services = materialized_services 109 self.c = c 110 111 def create_upgrade_steps_list(self) -> list[UpgradeStep]: 112 """ 113 Returns a list of upgrade step actions from the second service onward. 114 115 Each step is a closure that, when called, will perform 116 the upgrade step to the corresponding service. 117 """ 118 119 def create_upgrade_action( 120 current_service: Materialized, 121 previous_service: Materialized, 122 ): 123 def upgrade() -> None: 124 with self.c.override(current_service): 125 current_service_image = ( 126 current_service.config.get("image") or "current" 127 ) 128 previous_service_image = previous_service.config.get("image") 129 130 print(f"Bringing up {current_service_image}") 131 self.c.up(current_service.name) 132 print(f"Awaiting promotion of {current_service_image}") 133 self.c.await_mz_deployment_status( 134 DeploymentStatus.READY_TO_PROMOTE, current_service.name 135 ) 136 self.c.promote_mz(current_service.name) 137 print(f"Awaiting leader status of {current_service_image}") 138 self.c.await_mz_deployment_status( 139 DeploymentStatus.IS_LEADER, current_service.name 140 ) 141 142 print(f"Killing {previous_service_image}") 143 self.c.kill(previous_service.name, wait=True) 144 145 return upgrade 146 147 services = self.materialized_services 148 steps = [] 149 for idx in range(1, len(services)): 150 current_service = services[idx] 151 previous_service = services[idx - 1] 152 153 steps.append( 154 UpgradeStep( 155 new_service=current_service, 156 previous_service=previous_service, 157 upgrade=create_upgrade_action(current_service, previous_service), 158 ) 159 ) 160 return steps 161 162 def initialize(self) -> tuple[Materialized, list[UpgradeStep]]: 163 """ 164 Initialize the with the first service. Returns a list where 165 each step is a closure that, when called, will perform the upgrade step to the corresponding service. 166 """ 167 first_service = self.materialized_services[0] 168 with self.c.override(first_service): 169 print(f"Bringing up {first_service.name}") 170 self.c.up(first_service.name) 171 172 return first_service, self.create_upgrade_steps_list() 173 174 def print_upgrade_path(self) -> None: 175 """ 176 Print the upgrade steps. 177 """ 178 179 def image_to_string(image: str | None) -> str: 180 return "current" if image is None else image.split(":")[-1] 181 182 print( 183 f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}" 184 ) 185 186 def cleanup(self) -> None: 187 """ 188 Cleanup after upgrade. 189 """ 190 print("Cleaning up upgrade path") 191 # Ensure all services are killed and removed 192 self.c.kill( 193 *[service.name for service in self.materialized_services], wait=True 194 ) 195 self.c.rm( 196 *[service.name for service in self.materialized_services], 197 destroy_volumes=True, 198 ) 199 self.c.kill(METADATA_STORE, wait=True) 200 self.c.rm( 201 METADATA_STORE, 202 destroy_volumes=True, 203 )
Manages a sequence of Materialized service upgrades using zero-downtime deployments.
Args: materialized_services: List of Materialized instances representing each upgrade step
111 def create_upgrade_steps_list(self) -> list[UpgradeStep]: 112 """ 113 Returns a list of upgrade step actions from the second service onward. 114 115 Each step is a closure that, when called, will perform 116 the upgrade step to the corresponding service. 117 """ 118 119 def create_upgrade_action( 120 current_service: Materialized, 121 previous_service: Materialized, 122 ): 123 def upgrade() -> None: 124 with self.c.override(current_service): 125 current_service_image = ( 126 current_service.config.get("image") or "current" 127 ) 128 previous_service_image = previous_service.config.get("image") 129 130 print(f"Bringing up {current_service_image}") 131 self.c.up(current_service.name) 132 print(f"Awaiting promotion of {current_service_image}") 133 self.c.await_mz_deployment_status( 134 DeploymentStatus.READY_TO_PROMOTE, current_service.name 135 ) 136 self.c.promote_mz(current_service.name) 137 print(f"Awaiting leader status of {current_service_image}") 138 self.c.await_mz_deployment_status( 139 DeploymentStatus.IS_LEADER, current_service.name 140 ) 141 142 print(f"Killing {previous_service_image}") 143 self.c.kill(previous_service.name, wait=True) 144 145 return upgrade 146 147 services = self.materialized_services 148 steps = [] 149 for idx in range(1, len(services)): 150 current_service = services[idx] 151 previous_service = services[idx - 1] 152 153 steps.append( 154 UpgradeStep( 155 new_service=current_service, 156 previous_service=previous_service, 157 upgrade=create_upgrade_action(current_service, previous_service), 158 ) 159 ) 160 return steps
Returns a list of upgrade step actions from the second service onward.
Each step is a closure that, when called, will perform the upgrade step to the corresponding service.
162 def initialize(self) -> tuple[Materialized, list[UpgradeStep]]: 163 """ 164 Initialize the with the first service. Returns a list where 165 each step is a closure that, when called, will perform the upgrade step to the corresponding service. 166 """ 167 first_service = self.materialized_services[0] 168 with self.c.override(first_service): 169 print(f"Bringing up {first_service.name}") 170 self.c.up(first_service.name) 171 172 return first_service, self.create_upgrade_steps_list()
Initialize the with the first service. Returns a list where each step is a closure that, when called, will perform the upgrade step to the corresponding service.
174 def print_upgrade_path(self) -> None: 175 """ 176 Print the upgrade steps. 177 """ 178 179 def image_to_string(image: str | None) -> str: 180 return "current" if image is None else image.split(":")[-1] 181 182 print( 183 f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}" 184 )
Print the upgrade steps.
186 def cleanup(self) -> None: 187 """ 188 Cleanup after upgrade. 189 """ 190 print("Cleaning up upgrade path") 191 # Ensure all services are killed and removed 192 self.c.kill( 193 *[service.name for service in self.materialized_services], wait=True 194 ) 195 self.c.rm( 196 *[service.name for service in self.materialized_services], 197 destroy_volumes=True, 198 ) 199 self.c.kill(METADATA_STORE, wait=True) 200 self.c.rm( 201 METADATA_STORE, 202 destroy_volumes=True, 203 )
Cleanup after upgrade.