misc.python.materialize.mz_0dt_upgrader
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10from collections.abc import Callable 11from dataclasses import dataclass 12from random import Random 13from typing import TypedDict 14 15from materialize.mz_version import MzVersion 16from materialize.mzcompose import get_default_system_parameters 17from materialize.mzcompose.composition import Composition 18from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized 19from materialize.mzcompose.services.metadata_store import METADATA_STORE 20 21 22class MaterializedUpgradeArgs(TypedDict): 23 """Arguments for the Materialized service constructor required for 0dt upgrades.""" 24 25 name: str 26 image: str | None 27 deploy_generation: int 28 system_parameter_defaults: dict[str, str] 29 external_metadata_store: bool 30 restart: str 31 32 33@dataclass 34class UpgradeStep: 35 """Represents a single upgrade step with its service name and action.""" 36 37 new_service: Materialized 38 previous_service: Materialized 39 upgrade: Callable[[], None] 40 41 42def generate_materialized_upgrade_args( 43 versions: list[MzVersion | None], 44) -> list[MaterializedUpgradeArgs]: 45 """ 46 Constructs a list of required Materialized arguments for 0dt upgrades. 47 Requires there to be an mz_1 and mz_2 service already in the composition. 48 """ 49 # We use the first version to get the system parameters since the defaults for 50 # newer versions include cutting edge features than can break backwards compatibility. 51 # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features. 52 system_parameter_defaults = get_default_system_parameters(versions[0]) 53 54 return [ 55 MaterializedUpgradeArgs( 56 image=f"materialize/materialized:{version}" if version else None, 57 # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost. 58 name=f"mz_{(i % 2) + 1}", 59 # Generation number for the service. Required to start services in read only mode. 60 deploy_generation=i, 61 system_parameter_defaults=system_parameter_defaults, 62 # To share the same metadata store between services 63 external_metadata_store=True, 64 # To restart when container exits due to promotion 65 restart="on-failure", 66 ) 67 for i, version in enumerate(versions) 68 ] 69 70 71def generate_random_upgrade_path( 72 versions: list[MzVersion], 73 rng: Random | None = None, 74) -> list[MzVersion]: 75 """ 76 Generates a random upgrade path between the given versions. 77 """ 78 selected_versions = [] 79 80 rng = rng or Random() 81 # For each version in the input list, randomly select it with a 50% chance. 82 for v in versions: 83 if rng.random() < 0.5: 84 selected_versions.append(v) 85 86 # Always include at least one version to avoid empty paths. 87 if len(selected_versions) == 0: 88 selected_versions.append(rng.choice(versions)) 89 90 return selected_versions 91 92 93class Materialized0dtUpgrader: 94 """ 95 Manages a sequence of Materialized service upgrades using zero-downtime deployments. 96 97 Args: 98 materialized_services: List of Materialized instances representing each upgrade step 99 """ 100 101 def __init__(self, c: Composition, materialized_services: list[Materialized]): 102 self.materialized_services = materialized_services 103 self.c = c 104 105 def create_upgrade_steps_list(self) -> list[UpgradeStep]: 106 """ 107 Returns a list of upgrade step actions from the second service onward. 108 109 Each step is a closure that, when called, will perform 110 the upgrade step to the corresponding service. 111 """ 112 113 def create_upgrade_action( 114 current_service: Materialized, 115 previous_service: Materialized, 116 ): 117 def upgrade() -> None: 118 with self.c.override(current_service): 119 current_service_image = ( 120 current_service.config.get("image") or "current" 121 ) 122 previous_service_image = previous_service.config.get("image") 123 124 print(f"Bringing up {current_service_image}") 125 self.c.up(current_service.name) 126 print(f"Awaiting promotion of {current_service_image}") 127 self.c.await_mz_deployment_status( 128 DeploymentStatus.READY_TO_PROMOTE, current_service.name 129 ) 130 self.c.promote_mz(current_service.name) 131 print(f"Awaiting leader status of {current_service_image}") 132 self.c.await_mz_deployment_status( 133 DeploymentStatus.IS_LEADER, current_service.name 134 ) 135 136 print(f"Killing {previous_service_image}") 137 self.c.kill(previous_service.name, wait=True) 138 139 return upgrade 140 141 services = self.materialized_services 142 steps = [] 143 for idx in range(1, len(services)): 144 current_service = services[idx] 145 previous_service = services[idx - 1] 146 147 steps.append( 148 UpgradeStep( 149 new_service=current_service, 150 previous_service=previous_service, 151 upgrade=create_upgrade_action(current_service, previous_service), 152 ) 153 ) 154 return steps 155 156 def initialize(self) -> tuple[Materialized, list[UpgradeStep]]: 157 """ 158 Initialize the with the first service. Returns a list where 159 each step is a closure that, when called, will perform the upgrade step to the corresponding service. 160 """ 161 first_service = self.materialized_services[0] 162 with self.c.override(first_service): 163 print(f"Bringing up {first_service.name}") 164 self.c.up(first_service.name) 165 166 return first_service, self.create_upgrade_steps_list() 167 168 def print_upgrade_path(self) -> None: 169 """ 170 Print the upgrade steps. 171 """ 172 173 def image_to_string(image: str | None) -> str: 174 return "current" if image is None else image.split(":")[-1] 175 176 print( 177 f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}" 178 ) 179 180 def cleanup(self) -> None: 181 """ 182 Cleanup after upgrade. 183 """ 184 print("Cleaning up upgrade path") 185 # Ensure all services are killed and removed 186 self.c.kill( 187 *[service.name for service in self.materialized_services], wait=True 188 ) 189 self.c.rm( 190 *[service.name for service in self.materialized_services], 191 destroy_volumes=True, 192 ) 193 self.c.kill(METADATA_STORE, wait=True) 194 self.c.rm( 195 METADATA_STORE, 196 destroy_volumes=True, 197 )
23class MaterializedUpgradeArgs(TypedDict): 24 """Arguments for the Materialized service constructor required for 0dt upgrades.""" 25 26 name: str 27 image: str | None 28 deploy_generation: int 29 system_parameter_defaults: dict[str, str] 30 external_metadata_store: bool 31 restart: str
Arguments for the Materialized service constructor required for 0dt upgrades.
34@dataclass 35class UpgradeStep: 36 """Represents a single upgrade step with its service name and action.""" 37 38 new_service: Materialized 39 previous_service: Materialized 40 upgrade: Callable[[], None]
Represents a single upgrade step with its service name and action.
43def generate_materialized_upgrade_args( 44 versions: list[MzVersion | None], 45) -> list[MaterializedUpgradeArgs]: 46 """ 47 Constructs a list of required Materialized arguments for 0dt upgrades. 48 Requires there to be an mz_1 and mz_2 service already in the composition. 49 """ 50 # We use the first version to get the system parameters since the defaults for 51 # newer versions include cutting edge features than can break backwards compatibility. 52 # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features. 53 system_parameter_defaults = get_default_system_parameters(versions[0]) 54 55 return [ 56 MaterializedUpgradeArgs( 57 image=f"materialize/materialized:{version}" if version else None, 58 # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost. 59 name=f"mz_{(i % 2) + 1}", 60 # Generation number for the service. Required to start services in read only mode. 61 deploy_generation=i, 62 system_parameter_defaults=system_parameter_defaults, 63 # To share the same metadata store between services 64 external_metadata_store=True, 65 # To restart when container exits due to promotion 66 restart="on-failure", 67 ) 68 for i, version in enumerate(versions) 69 ]
Constructs a list of required Materialized arguments for 0dt upgrades. Requires there to be an mz_1 and mz_2 service already in the composition.
72def generate_random_upgrade_path( 73 versions: list[MzVersion], 74 rng: Random | None = None, 75) -> list[MzVersion]: 76 """ 77 Generates a random upgrade path between the given versions. 78 """ 79 selected_versions = [] 80 81 rng = rng or Random() 82 # For each version in the input list, randomly select it with a 50% chance. 83 for v in versions: 84 if rng.random() < 0.5: 85 selected_versions.append(v) 86 87 # Always include at least one version to avoid empty paths. 88 if len(selected_versions) == 0: 89 selected_versions.append(rng.choice(versions)) 90 91 return selected_versions
Generates a random upgrade path between the given versions.
94class Materialized0dtUpgrader: 95 """ 96 Manages a sequence of Materialized service upgrades using zero-downtime deployments. 97 98 Args: 99 materialized_services: List of Materialized instances representing each upgrade step 100 """ 101 102 def __init__(self, c: Composition, materialized_services: list[Materialized]): 103 self.materialized_services = materialized_services 104 self.c = c 105 106 def create_upgrade_steps_list(self) -> list[UpgradeStep]: 107 """ 108 Returns a list of upgrade step actions from the second service onward. 109 110 Each step is a closure that, when called, will perform 111 the upgrade step to the corresponding service. 112 """ 113 114 def create_upgrade_action( 115 current_service: Materialized, 116 previous_service: Materialized, 117 ): 118 def upgrade() -> None: 119 with self.c.override(current_service): 120 current_service_image = ( 121 current_service.config.get("image") or "current" 122 ) 123 previous_service_image = previous_service.config.get("image") 124 125 print(f"Bringing up {current_service_image}") 126 self.c.up(current_service.name) 127 print(f"Awaiting promotion of {current_service_image}") 128 self.c.await_mz_deployment_status( 129 DeploymentStatus.READY_TO_PROMOTE, current_service.name 130 ) 131 self.c.promote_mz(current_service.name) 132 print(f"Awaiting leader status of {current_service_image}") 133 self.c.await_mz_deployment_status( 134 DeploymentStatus.IS_LEADER, current_service.name 135 ) 136 137 print(f"Killing {previous_service_image}") 138 self.c.kill(previous_service.name, wait=True) 139 140 return upgrade 141 142 services = self.materialized_services 143 steps = [] 144 for idx in range(1, len(services)): 145 current_service = services[idx] 146 previous_service = services[idx - 1] 147 148 steps.append( 149 UpgradeStep( 150 new_service=current_service, 151 previous_service=previous_service, 152 upgrade=create_upgrade_action(current_service, previous_service), 153 ) 154 ) 155 return steps 156 157 def initialize(self) -> tuple[Materialized, list[UpgradeStep]]: 158 """ 159 Initialize the with the first service. Returns a list where 160 each step is a closure that, when called, will perform the upgrade step to the corresponding service. 161 """ 162 first_service = self.materialized_services[0] 163 with self.c.override(first_service): 164 print(f"Bringing up {first_service.name}") 165 self.c.up(first_service.name) 166 167 return first_service, self.create_upgrade_steps_list() 168 169 def print_upgrade_path(self) -> None: 170 """ 171 Print the upgrade steps. 172 """ 173 174 def image_to_string(image: str | None) -> str: 175 return "current" if image is None else image.split(":")[-1] 176 177 print( 178 f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}" 179 ) 180 181 def cleanup(self) -> None: 182 """ 183 Cleanup after upgrade. 184 """ 185 print("Cleaning up upgrade path") 186 # Ensure all services are killed and removed 187 self.c.kill( 188 *[service.name for service in self.materialized_services], wait=True 189 ) 190 self.c.rm( 191 *[service.name for service in self.materialized_services], 192 destroy_volumes=True, 193 ) 194 self.c.kill(METADATA_STORE, wait=True) 195 self.c.rm( 196 METADATA_STORE, 197 destroy_volumes=True, 198 )
Manages a sequence of Materialized service upgrades using zero-downtime deployments.
Args: materialized_services: List of Materialized instances representing each upgrade step
106 def create_upgrade_steps_list(self) -> list[UpgradeStep]: 107 """ 108 Returns a list of upgrade step actions from the second service onward. 109 110 Each step is a closure that, when called, will perform 111 the upgrade step to the corresponding service. 112 """ 113 114 def create_upgrade_action( 115 current_service: Materialized, 116 previous_service: Materialized, 117 ): 118 def upgrade() -> None: 119 with self.c.override(current_service): 120 current_service_image = ( 121 current_service.config.get("image") or "current" 122 ) 123 previous_service_image = previous_service.config.get("image") 124 125 print(f"Bringing up {current_service_image}") 126 self.c.up(current_service.name) 127 print(f"Awaiting promotion of {current_service_image}") 128 self.c.await_mz_deployment_status( 129 DeploymentStatus.READY_TO_PROMOTE, current_service.name 130 ) 131 self.c.promote_mz(current_service.name) 132 print(f"Awaiting leader status of {current_service_image}") 133 self.c.await_mz_deployment_status( 134 DeploymentStatus.IS_LEADER, current_service.name 135 ) 136 137 print(f"Killing {previous_service_image}") 138 self.c.kill(previous_service.name, wait=True) 139 140 return upgrade 141 142 services = self.materialized_services 143 steps = [] 144 for idx in range(1, len(services)): 145 current_service = services[idx] 146 previous_service = services[idx - 1] 147 148 steps.append( 149 UpgradeStep( 150 new_service=current_service, 151 previous_service=previous_service, 152 upgrade=create_upgrade_action(current_service, previous_service), 153 ) 154 ) 155 return steps
Returns a list of upgrade step actions from the second service onward.
Each step is a closure that, when called, will perform the upgrade step to the corresponding service.
157 def initialize(self) -> tuple[Materialized, list[UpgradeStep]]: 158 """ 159 Initialize the with the first service. Returns a list where 160 each step is a closure that, when called, will perform the upgrade step to the corresponding service. 161 """ 162 first_service = self.materialized_services[0] 163 with self.c.override(first_service): 164 print(f"Bringing up {first_service.name}") 165 self.c.up(first_service.name) 166 167 return first_service, self.create_upgrade_steps_list()
Initialize the with the first service. Returns a list where each step is a closure that, when called, will perform the upgrade step to the corresponding service.
169 def print_upgrade_path(self) -> None: 170 """ 171 Print the upgrade steps. 172 """ 173 174 def image_to_string(image: str | None) -> str: 175 return "current" if image is None else image.split(":")[-1] 176 177 print( 178 f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}" 179 )
Print the upgrade steps.
181 def cleanup(self) -> None: 182 """ 183 Cleanup after upgrade. 184 """ 185 print("Cleaning up upgrade path") 186 # Ensure all services are killed and removed 187 self.c.kill( 188 *[service.name for service in self.materialized_services], wait=True 189 ) 190 self.c.rm( 191 *[service.name for service in self.materialized_services], 192 destroy_volumes=True, 193 ) 194 self.c.kill(METADATA_STORE, wait=True) 195 self.c.rm( 196 METADATA_STORE, 197 destroy_volumes=True, 198 )
Cleanup after upgrade.