misc.python.materialize.mz_0dt_upgrader

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10from collections.abc import Callable
 11from dataclasses import dataclass
 12from random import Random
 13from typing import TypedDict
 14
 15from materialize.mz_version import MzVersion
 16from materialize.mzcompose import get_default_system_parameters
 17from materialize.mzcompose.composition import Composition
 18from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
 19from materialize.mzcompose.services.metadata_store import METADATA_STORE
 20
 21
 22class MaterializedUpgradeArgs(TypedDict):
 23    """Arguments for the Materialized service constructor required for 0dt upgrades."""
 24
 25    name: str
 26    image: str | None
 27    deploy_generation: int
 28    system_parameter_defaults: dict[str, str]
 29    external_metadata_store: bool
 30    restart: str
 31
 32
 33@dataclass
 34class UpgradeStep:
 35    """Represents a single upgrade step with its service name and action."""
 36
 37    new_service: Materialized
 38    previous_service: Materialized
 39    upgrade: Callable[[], None]
 40
 41
 42def generate_materialized_upgrade_args(
 43    versions: list[MzVersion | None],
 44) -> list[MaterializedUpgradeArgs]:
 45    """
 46    Constructs a list of required Materialized arguments for 0dt upgrades.
 47    Requires there to be an mz_1 and mz_2 service already in the composition.
 48    """
 49    # We use the first version to get the system parameters since the defaults for
 50    # newer versions include cutting edge features than can break backwards compatibility.
 51    # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features.
 52    system_parameter_defaults = get_default_system_parameters(versions[0])
 53
 54    return [
 55        MaterializedUpgradeArgs(
 56            image=f"materialize/materialized:{version}" if version else None,
 57            # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost.
 58            name=f"mz_{(i % 2) + 1}",
 59            # Generation number for the service. Required to start services in read only mode.
 60            deploy_generation=i,
 61            system_parameter_defaults=system_parameter_defaults,
 62            # To share the same metadata store between services
 63            external_metadata_store=True,
 64            # To restart when container exits due to promotion
 65            restart="on-failure",
 66        )
 67        for i, version in enumerate(versions)
 68    ]
 69
 70
 71def generate_random_upgrade_path(
 72    versions: list[MzVersion],
 73    rng: Random | None = None,
 74    max_versions: int = 6,
 75) -> list[MzVersion]:
 76    """
 77    Generates a random upgrade path between the given versions.
 78    """
 79    selected_versions = []
 80
 81    rng = rng or Random()
 82    # For each version in the input list, randomly select it with a 1/3 chance.
 83    for v in versions:
 84        if rng.random() < 0.33:
 85            selected_versions.append(v)
 86
 87    # Always include at least one version to avoid empty paths.
 88    if len(selected_versions) == 0:
 89        selected_versions.append(rng.choice(versions))
 90
 91    # Cap the number of versions to keep test runtime bounded.
 92    if len(selected_versions) > max_versions:
 93        selected_versions = sorted(rng.sample(selected_versions, max_versions))
 94
 95    return selected_versions
 96
 97
 98class Materialized0dtUpgrader:
 99    """
100    Manages a sequence of Materialized service upgrades using zero-downtime deployments.
101
102    Args:
103        materialized_services: List of Materialized instances representing each upgrade step
104    """
105
106    def __init__(self, c: Composition, materialized_services: list[Materialized]):
107        self.materialized_services = materialized_services
108        self.c = c
109
110    def create_upgrade_steps_list(self) -> list[UpgradeStep]:
111        """
112        Returns a list of upgrade step actions from the second service onward.
113
114        Each step is a closure that, when called, will perform
115        the upgrade step to the corresponding service.
116        """
117
118        def create_upgrade_action(
119            current_service: Materialized,
120            previous_service: Materialized,
121        ):
122            def upgrade() -> None:
123                with self.c.override(current_service):
124                    current_service_image = (
125                        current_service.config.get("image") or "current"
126                    )
127                    previous_service_image = previous_service.config.get("image")
128
129                    print(f"Bringing up {current_service_image}")
130                    self.c.up(current_service.name)
131                    print(f"Awaiting promotion of {current_service_image}")
132                    self.c.await_mz_deployment_status(
133                        DeploymentStatus.READY_TO_PROMOTE, current_service.name
134                    )
135                    self.c.promote_mz(current_service.name)
136                    print(f"Awaiting leader status of {current_service_image}")
137                    self.c.await_mz_deployment_status(
138                        DeploymentStatus.IS_LEADER, current_service.name
139                    )
140
141                    print(f"Killing {previous_service_image}")
142                    self.c.kill(previous_service.name, wait=True)
143
144            return upgrade
145
146        services = self.materialized_services
147        steps = []
148        for idx in range(1, len(services)):
149            current_service = services[idx]
150            previous_service = services[idx - 1]
151
152            steps.append(
153                UpgradeStep(
154                    new_service=current_service,
155                    previous_service=previous_service,
156                    upgrade=create_upgrade_action(current_service, previous_service),
157                )
158            )
159        return steps
160
161    def initialize(self) -> tuple[Materialized, list[UpgradeStep]]:
162        """
163        Initialize the with the first service. Returns a list where
164        each step is a closure that, when called, will perform the upgrade step to the corresponding service.
165        """
166        first_service = self.materialized_services[0]
167        with self.c.override(first_service):
168            print(f"Bringing up {first_service.name}")
169            self.c.up(first_service.name)
170
171        return first_service, self.create_upgrade_steps_list()
172
173    def print_upgrade_path(self) -> None:
174        """
175        Print the upgrade steps.
176        """
177
178        def image_to_string(image: str | None) -> str:
179            return "current" if image is None else image.split(":")[-1]
180
181        print(
182            f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}"
183        )
184
185    def cleanup(self) -> None:
186        """
187        Cleanup after upgrade.
188        """
189        print("Cleaning up upgrade path")
190        # Ensure all services are killed and removed
191        self.c.kill(
192            *[service.name for service in self.materialized_services], wait=True
193        )
194        self.c.rm(
195            *[service.name for service in self.materialized_services],
196            destroy_volumes=True,
197        )
198        self.c.kill(METADATA_STORE, wait=True)
199        self.c.rm(
200            METADATA_STORE,
201            destroy_volumes=True,
202        )
class MaterializedUpgradeArgs(typing.TypedDict):
23class MaterializedUpgradeArgs(TypedDict):
24    """Arguments for the Materialized service constructor required for 0dt upgrades."""
25
26    name: str
27    image: str | None
28    deploy_generation: int
29    system_parameter_defaults: dict[str, str]
30    external_metadata_store: bool
31    restart: str

Arguments for the Materialized service constructor required for 0dt upgrades.

name: str
image: str | None
deploy_generation: int
system_parameter_defaults: dict[str, str]
external_metadata_store: bool
restart: str
@dataclass
class UpgradeStep:
34@dataclass
35class UpgradeStep:
36    """Represents a single upgrade step with its service name and action."""
37
38    new_service: Materialized
39    previous_service: Materialized
40    upgrade: Callable[[], None]

Represents a single upgrade step with its service name and action.

UpgradeStep( new_service: materialize.mzcompose.services.materialized.Materialized, previous_service: materialize.mzcompose.services.materialized.Materialized, upgrade: Callable[[], None])
new_service: materialize.mzcompose.services.materialized.Materialized
previous_service: materialize.mzcompose.services.materialized.Materialized
upgrade: Callable[[], None]
def generate_materialized_upgrade_args( versions: list[materialize.mz_version.MzVersion | None]) -> list[MaterializedUpgradeArgs]:
43def generate_materialized_upgrade_args(
44    versions: list[MzVersion | None],
45) -> list[MaterializedUpgradeArgs]:
46    """
47    Constructs a list of required Materialized arguments for 0dt upgrades.
48    Requires there to be an mz_1 and mz_2 service already in the composition.
49    """
50    # We use the first version to get the system parameters since the defaults for
51    # newer versions include cutting edge features than can break backwards compatibility.
52    # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features.
53    system_parameter_defaults = get_default_system_parameters(versions[0])
54
55    return [
56        MaterializedUpgradeArgs(
57            image=f"materialize/materialized:{version}" if version else None,
58            # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost.
59            name=f"mz_{(i % 2) + 1}",
60            # Generation number for the service. Required to start services in read only mode.
61            deploy_generation=i,
62            system_parameter_defaults=system_parameter_defaults,
63            # To share the same metadata store between services
64            external_metadata_store=True,
65            # To restart when container exits due to promotion
66            restart="on-failure",
67        )
68        for i, version in enumerate(versions)
69    ]

Constructs a list of required Materialized arguments for 0dt upgrades. Requires there to be an mz_1 and mz_2 service already in the composition.

def generate_random_upgrade_path( versions: list[materialize.mz_version.MzVersion], rng: random.Random | None = None, max_versions: int = 6) -> list[materialize.mz_version.MzVersion]:
72def generate_random_upgrade_path(
73    versions: list[MzVersion],
74    rng: Random | None = None,
75    max_versions: int = 6,
76) -> list[MzVersion]:
77    """
78    Generates a random upgrade path between the given versions.
79    """
80    selected_versions = []
81
82    rng = rng or Random()
83    # For each version in the input list, randomly select it with a 1/3 chance.
84    for v in versions:
85        if rng.random() < 0.33:
86            selected_versions.append(v)
87
88    # Always include at least one version to avoid empty paths.
89    if len(selected_versions) == 0:
90        selected_versions.append(rng.choice(versions))
91
92    # Cap the number of versions to keep test runtime bounded.
93    if len(selected_versions) > max_versions:
94        selected_versions = sorted(rng.sample(selected_versions, max_versions))
95
96    return selected_versions

Generates a random upgrade path between the given versions.

class Materialized0dtUpgrader:
 99class Materialized0dtUpgrader:
100    """
101    Manages a sequence of Materialized service upgrades using zero-downtime deployments.
102
103    Args:
104        materialized_services: List of Materialized instances representing each upgrade step
105    """
106
107    def __init__(self, c: Composition, materialized_services: list[Materialized]):
108        self.materialized_services = materialized_services
109        self.c = c
110
111    def create_upgrade_steps_list(self) -> list[UpgradeStep]:
112        """
113        Returns a list of upgrade step actions from the second service onward.
114
115        Each step is a closure that, when called, will perform
116        the upgrade step to the corresponding service.
117        """
118
119        def create_upgrade_action(
120            current_service: Materialized,
121            previous_service: Materialized,
122        ):
123            def upgrade() -> None:
124                with self.c.override(current_service):
125                    current_service_image = (
126                        current_service.config.get("image") or "current"
127                    )
128                    previous_service_image = previous_service.config.get("image")
129
130                    print(f"Bringing up {current_service_image}")
131                    self.c.up(current_service.name)
132                    print(f"Awaiting promotion of {current_service_image}")
133                    self.c.await_mz_deployment_status(
134                        DeploymentStatus.READY_TO_PROMOTE, current_service.name
135                    )
136                    self.c.promote_mz(current_service.name)
137                    print(f"Awaiting leader status of {current_service_image}")
138                    self.c.await_mz_deployment_status(
139                        DeploymentStatus.IS_LEADER, current_service.name
140                    )
141
142                    print(f"Killing {previous_service_image}")
143                    self.c.kill(previous_service.name, wait=True)
144
145            return upgrade
146
147        services = self.materialized_services
148        steps = []
149        for idx in range(1, len(services)):
150            current_service = services[idx]
151            previous_service = services[idx - 1]
152
153            steps.append(
154                UpgradeStep(
155                    new_service=current_service,
156                    previous_service=previous_service,
157                    upgrade=create_upgrade_action(current_service, previous_service),
158                )
159            )
160        return steps
161
162    def initialize(self) -> tuple[Materialized, list[UpgradeStep]]:
163        """
164        Initialize the with the first service. Returns a list where
165        each step is a closure that, when called, will perform the upgrade step to the corresponding service.
166        """
167        first_service = self.materialized_services[0]
168        with self.c.override(first_service):
169            print(f"Bringing up {first_service.name}")
170            self.c.up(first_service.name)
171
172        return first_service, self.create_upgrade_steps_list()
173
174    def print_upgrade_path(self) -> None:
175        """
176        Print the upgrade steps.
177        """
178
179        def image_to_string(image: str | None) -> str:
180            return "current" if image is None else image.split(":")[-1]
181
182        print(
183            f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}"
184        )
185
186    def cleanup(self) -> None:
187        """
188        Cleanup after upgrade.
189        """
190        print("Cleaning up upgrade path")
191        # Ensure all services are killed and removed
192        self.c.kill(
193            *[service.name for service in self.materialized_services], wait=True
194        )
195        self.c.rm(
196            *[service.name for service in self.materialized_services],
197            destroy_volumes=True,
198        )
199        self.c.kill(METADATA_STORE, wait=True)
200        self.c.rm(
201            METADATA_STORE,
202            destroy_volumes=True,
203        )

Manages a sequence of Materialized service upgrades using zero-downtime deployments.

Args: materialized_services: List of Materialized instances representing each upgrade step

Materialized0dtUpgrader( c: materialize.mzcompose.composition.Composition, materialized_services: list[materialize.mzcompose.services.materialized.Materialized])
107    def __init__(self, c: Composition, materialized_services: list[Materialized]):
108        self.materialized_services = materialized_services
109        self.c = c
materialized_services
c
def create_upgrade_steps_list(self) -> list[UpgradeStep]:
111    def create_upgrade_steps_list(self) -> list[UpgradeStep]:
112        """
113        Returns a list of upgrade step actions from the second service onward.
114
115        Each step is a closure that, when called, will perform
116        the upgrade step to the corresponding service.
117        """
118
119        def create_upgrade_action(
120            current_service: Materialized,
121            previous_service: Materialized,
122        ):
123            def upgrade() -> None:
124                with self.c.override(current_service):
125                    current_service_image = (
126                        current_service.config.get("image") or "current"
127                    )
128                    previous_service_image = previous_service.config.get("image")
129
130                    print(f"Bringing up {current_service_image}")
131                    self.c.up(current_service.name)
132                    print(f"Awaiting promotion of {current_service_image}")
133                    self.c.await_mz_deployment_status(
134                        DeploymentStatus.READY_TO_PROMOTE, current_service.name
135                    )
136                    self.c.promote_mz(current_service.name)
137                    print(f"Awaiting leader status of {current_service_image}")
138                    self.c.await_mz_deployment_status(
139                        DeploymentStatus.IS_LEADER, current_service.name
140                    )
141
142                    print(f"Killing {previous_service_image}")
143                    self.c.kill(previous_service.name, wait=True)
144
145            return upgrade
146
147        services = self.materialized_services
148        steps = []
149        for idx in range(1, len(services)):
150            current_service = services[idx]
151            previous_service = services[idx - 1]
152
153            steps.append(
154                UpgradeStep(
155                    new_service=current_service,
156                    previous_service=previous_service,
157                    upgrade=create_upgrade_action(current_service, previous_service),
158                )
159            )
160        return steps

Returns a list of upgrade step actions from the second service onward.

Each step is a closure that, when called, will perform the upgrade step to the corresponding service.

def initialize( self) -> tuple[materialize.mzcompose.services.materialized.Materialized, list[UpgradeStep]]:
162    def initialize(self) -> tuple[Materialized, list[UpgradeStep]]:
163        """
164        Initialize the with the first service. Returns a list where
165        each step is a closure that, when called, will perform the upgrade step to the corresponding service.
166        """
167        first_service = self.materialized_services[0]
168        with self.c.override(first_service):
169            print(f"Bringing up {first_service.name}")
170            self.c.up(first_service.name)
171
172        return first_service, self.create_upgrade_steps_list()

Initialize the with the first service. Returns a list where each step is a closure that, when called, will perform the upgrade step to the corresponding service.

def print_upgrade_path(self) -> None:
174    def print_upgrade_path(self) -> None:
175        """
176        Print the upgrade steps.
177        """
178
179        def image_to_string(image: str | None) -> str:
180            return "current" if image is None else image.split(":")[-1]
181
182        print(
183            f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}"
184        )

Print the upgrade steps.

def cleanup(self) -> None:
186    def cleanup(self) -> None:
187        """
188        Cleanup after upgrade.
189        """
190        print("Cleaning up upgrade path")
191        # Ensure all services are killed and removed
192        self.c.kill(
193            *[service.name for service in self.materialized_services], wait=True
194        )
195        self.c.rm(
196            *[service.name for service in self.materialized_services],
197            destroy_volumes=True,
198        )
199        self.c.kill(METADATA_STORE, wait=True)
200        self.c.rm(
201            METADATA_STORE,
202            destroy_volumes=True,
203        )

Cleanup after upgrade.