misc.python.materialize.mz_0dt_upgrader

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10from collections.abc import Callable
 11from dataclasses import dataclass
 12from random import Random
 13from typing import TypedDict
 14
 15from materialize.mz_version import MzVersion
 16from materialize.mzcompose import get_default_system_parameters
 17from materialize.mzcompose.composition import Composition
 18from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
 19from materialize.mzcompose.services.metadata_store import METADATA_STORE
 20
 21
 22class MaterializedUpgradeArgs(TypedDict):
 23    """Arguments for the Materialized service constructor required for 0dt upgrades."""
 24
 25    name: str
 26    image: str | None
 27    deploy_generation: int
 28    system_parameter_defaults: dict[str, str]
 29    external_metadata_store: bool
 30    restart: str
 31
 32
 33@dataclass
 34class UpgradeStep:
 35    """Represents a single upgrade step with its service name and action."""
 36
 37    new_service: Materialized
 38    previous_service: Materialized
 39    upgrade: Callable[[], None]
 40
 41
 42def generate_materialized_upgrade_args(
 43    versions: list[MzVersion | None],
 44) -> list[MaterializedUpgradeArgs]:
 45    """
 46    Constructs a list of required Materialized arguments for 0dt upgrades.
 47    Requires there to be an mz_1 and mz_2 service already in the composition.
 48    """
 49    # We use the first version to get the system parameters since the defaults for
 50    # newer versions include cutting edge features than can break backwards compatibility.
 51    # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features.
 52    system_parameter_defaults = get_default_system_parameters(versions[0])
 53
 54    return [
 55        MaterializedUpgradeArgs(
 56            image=f"materialize/materialized:{version}" if version else None,
 57            # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost.
 58            name=f"mz_{(i % 2) + 1}",
 59            # Generation number for the service. Required to start services in read only mode.
 60            deploy_generation=i,
 61            system_parameter_defaults=system_parameter_defaults,
 62            # To share the same metadata store between services
 63            external_metadata_store=True,
 64            # To restart when container exits due to promotion
 65            restart="on-failure",
 66        )
 67        for i, version in enumerate(versions)
 68    ]
 69
 70
 71def generate_random_upgrade_path(
 72    versions: list[MzVersion],
 73    rng: Random | None = None,
 74) -> list[MzVersion]:
 75    """
 76    Generates a random upgrade path between the given versions.
 77    """
 78    selected_versions = []
 79
 80    rng = rng or Random()
 81    # For each version in the input list, randomly select it with a 50% chance.
 82    for v in versions:
 83        if rng.random() < 0.5:
 84            selected_versions.append(v)
 85
 86    # Always include at least one version to avoid empty paths.
 87    if len(selected_versions) == 0:
 88        selected_versions.append(rng.choice(versions))
 89
 90    return selected_versions
 91
 92
 93class Materialized0dtUpgrader:
 94    """
 95    Manages a sequence of Materialized service upgrades using zero-downtime deployments.
 96
 97    Args:
 98        materialized_services: List of Materialized instances representing each upgrade step
 99    """
100
101    def __init__(self, c: Composition, materialized_services: list[Materialized]):
102        self.materialized_services = materialized_services
103        self.c = c
104
105    def create_upgrade_steps_list(self) -> list[UpgradeStep]:
106        """
107        Returns a list of upgrade step actions from the second service onward.
108
109        Each step is a closure that, when called, will perform
110        the upgrade step to the corresponding service.
111        """
112
113        def create_upgrade_action(
114            current_service: Materialized,
115            previous_service: Materialized,
116        ):
117            def upgrade() -> None:
118                with self.c.override(current_service):
119                    current_service_image = (
120                        current_service.config.get("image") or "current"
121                    )
122                    previous_service_image = previous_service.config.get("image")
123
124                    print(f"Bringing up {current_service_image}")
125                    self.c.up(current_service.name)
126                    print(f"Awaiting promotion of {current_service_image}")
127                    self.c.await_mz_deployment_status(
128                        DeploymentStatus.READY_TO_PROMOTE, current_service.name
129                    )
130                    self.c.promote_mz(current_service.name)
131                    print(f"Awaiting leader status of {current_service_image}")
132                    self.c.await_mz_deployment_status(
133                        DeploymentStatus.IS_LEADER, current_service.name
134                    )
135
136                    print(f"Killing {previous_service_image}")
137                    self.c.kill(previous_service.name, wait=True)
138
139            return upgrade
140
141        services = self.materialized_services
142        steps = []
143        for idx in range(1, len(services)):
144            current_service = services[idx]
145            previous_service = services[idx - 1]
146
147            steps.append(
148                UpgradeStep(
149                    new_service=current_service,
150                    previous_service=previous_service,
151                    upgrade=create_upgrade_action(current_service, previous_service),
152                )
153            )
154        return steps
155
156    def initialize(self) -> tuple[Materialized, list[UpgradeStep]]:
157        """
158        Initialize the with the first service. Returns a list where
159        each step is a closure that, when called, will perform the upgrade step to the corresponding service.
160        """
161        first_service = self.materialized_services[0]
162        with self.c.override(first_service):
163            print(f"Bringing up {first_service.name}")
164            self.c.up(first_service.name)
165
166        return first_service, self.create_upgrade_steps_list()
167
168    def print_upgrade_path(self) -> None:
169        """
170        Print the upgrade steps.
171        """
172
173        def image_to_string(image: str | None) -> str:
174            return "current" if image is None else image.split(":")[-1]
175
176        print(
177            f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}"
178        )
179
180    def cleanup(self) -> None:
181        """
182        Cleanup after upgrade.
183        """
184        print("Cleaning up upgrade path")
185        # Ensure all services are killed and removed
186        self.c.kill(
187            *[service.name for service in self.materialized_services], wait=True
188        )
189        self.c.rm(
190            *[service.name for service in self.materialized_services],
191            destroy_volumes=True,
192        )
193        self.c.kill(METADATA_STORE, wait=True)
194        self.c.rm(
195            METADATA_STORE,
196            destroy_volumes=True,
197        )
class MaterializedUpgradeArgs(typing.TypedDict):
23class MaterializedUpgradeArgs(TypedDict):
24    """Arguments for the Materialized service constructor required for 0dt upgrades."""
25
26    name: str
27    image: str | None
28    deploy_generation: int
29    system_parameter_defaults: dict[str, str]
30    external_metadata_store: bool
31    restart: str

Arguments for the Materialized service constructor required for 0dt upgrades.

name: str
image: str | None
deploy_generation: int
system_parameter_defaults: dict[str, str]
external_metadata_store: bool
restart: str
@dataclass
class UpgradeStep:
34@dataclass
35class UpgradeStep:
36    """Represents a single upgrade step with its service name and action."""
37
38    new_service: Materialized
39    previous_service: Materialized
40    upgrade: Callable[[], None]

Represents a single upgrade step with its service name and action.

UpgradeStep( new_service: materialize.mzcompose.services.materialized.Materialized, previous_service: materialize.mzcompose.services.materialized.Materialized, upgrade: Callable[[], None])
new_service: materialize.mzcompose.services.materialized.Materialized
previous_service: materialize.mzcompose.services.materialized.Materialized
upgrade: Callable[[], None]
def generate_materialized_upgrade_args( versions: list[materialize.mz_version.MzVersion | None]) -> list[MaterializedUpgradeArgs]:
43def generate_materialized_upgrade_args(
44    versions: list[MzVersion | None],
45) -> list[MaterializedUpgradeArgs]:
46    """
47    Constructs a list of required Materialized arguments for 0dt upgrades.
48    Requires there to be an mz_1 and mz_2 service already in the composition.
49    """
50    # We use the first version to get the system parameters since the defaults for
51    # newer versions include cutting edge features than can break backwards compatibility.
52    # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features.
53    system_parameter_defaults = get_default_system_parameters(versions[0])
54
55    return [
56        MaterializedUpgradeArgs(
57            image=f"materialize/materialized:{version}" if version else None,
58            # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost.
59            name=f"mz_{(i % 2) + 1}",
60            # Generation number for the service. Required to start services in read only mode.
61            deploy_generation=i,
62            system_parameter_defaults=system_parameter_defaults,
63            # To share the same metadata store between services
64            external_metadata_store=True,
65            # To restart when container exits due to promotion
66            restart="on-failure",
67        )
68        for i, version in enumerate(versions)
69    ]

Constructs a list of required Materialized arguments for 0dt upgrades. Requires there to be an mz_1 and mz_2 service already in the composition.

def generate_random_upgrade_path( versions: list[materialize.mz_version.MzVersion], rng: random.Random | None = None) -> list[materialize.mz_version.MzVersion]:
72def generate_random_upgrade_path(
73    versions: list[MzVersion],
74    rng: Random | None = None,
75) -> list[MzVersion]:
76    """
77    Generates a random upgrade path between the given versions.
78    """
79    selected_versions = []
80
81    rng = rng or Random()
82    # For each version in the input list, randomly select it with a 50% chance.
83    for v in versions:
84        if rng.random() < 0.5:
85            selected_versions.append(v)
86
87    # Always include at least one version to avoid empty paths.
88    if len(selected_versions) == 0:
89        selected_versions.append(rng.choice(versions))
90
91    return selected_versions

Generates a random upgrade path between the given versions.

class Materialized0dtUpgrader:
 94class Materialized0dtUpgrader:
 95    """
 96    Manages a sequence of Materialized service upgrades using zero-downtime deployments.
 97
 98    Args:
 99        materialized_services: List of Materialized instances representing each upgrade step
100    """
101
102    def __init__(self, c: Composition, materialized_services: list[Materialized]):
103        self.materialized_services = materialized_services
104        self.c = c
105
106    def create_upgrade_steps_list(self) -> list[UpgradeStep]:
107        """
108        Returns a list of upgrade step actions from the second service onward.
109
110        Each step is a closure that, when called, will perform
111        the upgrade step to the corresponding service.
112        """
113
114        def create_upgrade_action(
115            current_service: Materialized,
116            previous_service: Materialized,
117        ):
118            def upgrade() -> None:
119                with self.c.override(current_service):
120                    current_service_image = (
121                        current_service.config.get("image") or "current"
122                    )
123                    previous_service_image = previous_service.config.get("image")
124
125                    print(f"Bringing up {current_service_image}")
126                    self.c.up(current_service.name)
127                    print(f"Awaiting promotion of {current_service_image}")
128                    self.c.await_mz_deployment_status(
129                        DeploymentStatus.READY_TO_PROMOTE, current_service.name
130                    )
131                    self.c.promote_mz(current_service.name)
132                    print(f"Awaiting leader status of {current_service_image}")
133                    self.c.await_mz_deployment_status(
134                        DeploymentStatus.IS_LEADER, current_service.name
135                    )
136
137                    print(f"Killing {previous_service_image}")
138                    self.c.kill(previous_service.name, wait=True)
139
140            return upgrade
141
142        services = self.materialized_services
143        steps = []
144        for idx in range(1, len(services)):
145            current_service = services[idx]
146            previous_service = services[idx - 1]
147
148            steps.append(
149                UpgradeStep(
150                    new_service=current_service,
151                    previous_service=previous_service,
152                    upgrade=create_upgrade_action(current_service, previous_service),
153                )
154            )
155        return steps
156
157    def initialize(self) -> tuple[Materialized, list[UpgradeStep]]:
158        """
159        Initialize the with the first service. Returns a list where
160        each step is a closure that, when called, will perform the upgrade step to the corresponding service.
161        """
162        first_service = self.materialized_services[0]
163        with self.c.override(first_service):
164            print(f"Bringing up {first_service.name}")
165            self.c.up(first_service.name)
166
167        return first_service, self.create_upgrade_steps_list()
168
169    def print_upgrade_path(self) -> None:
170        """
171        Print the upgrade steps.
172        """
173
174        def image_to_string(image: str | None) -> str:
175            return "current" if image is None else image.split(":")[-1]
176
177        print(
178            f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}"
179        )
180
181    def cleanup(self) -> None:
182        """
183        Cleanup after upgrade.
184        """
185        print("Cleaning up upgrade path")
186        # Ensure all services are killed and removed
187        self.c.kill(
188            *[service.name for service in self.materialized_services], wait=True
189        )
190        self.c.rm(
191            *[service.name for service in self.materialized_services],
192            destroy_volumes=True,
193        )
194        self.c.kill(METADATA_STORE, wait=True)
195        self.c.rm(
196            METADATA_STORE,
197            destroy_volumes=True,
198        )

Manages a sequence of Materialized service upgrades using zero-downtime deployments.

Args: materialized_services: List of Materialized instances representing each upgrade step

Materialized0dtUpgrader( c: materialize.mzcompose.composition.Composition, materialized_services: list[materialize.mzcompose.services.materialized.Materialized])
102    def __init__(self, c: Composition, materialized_services: list[Materialized]):
103        self.materialized_services = materialized_services
104        self.c = c
materialized_services
c
def create_upgrade_steps_list(self) -> list[UpgradeStep]:
106    def create_upgrade_steps_list(self) -> list[UpgradeStep]:
107        """
108        Returns a list of upgrade step actions from the second service onward.
109
110        Each step is a closure that, when called, will perform
111        the upgrade step to the corresponding service.
112        """
113
114        def create_upgrade_action(
115            current_service: Materialized,
116            previous_service: Materialized,
117        ):
118            def upgrade() -> None:
119                with self.c.override(current_service):
120                    current_service_image = (
121                        current_service.config.get("image") or "current"
122                    )
123                    previous_service_image = previous_service.config.get("image")
124
125                    print(f"Bringing up {current_service_image}")
126                    self.c.up(current_service.name)
127                    print(f"Awaiting promotion of {current_service_image}")
128                    self.c.await_mz_deployment_status(
129                        DeploymentStatus.READY_TO_PROMOTE, current_service.name
130                    )
131                    self.c.promote_mz(current_service.name)
132                    print(f"Awaiting leader status of {current_service_image}")
133                    self.c.await_mz_deployment_status(
134                        DeploymentStatus.IS_LEADER, current_service.name
135                    )
136
137                    print(f"Killing {previous_service_image}")
138                    self.c.kill(previous_service.name, wait=True)
139
140            return upgrade
141
142        services = self.materialized_services
143        steps = []
144        for idx in range(1, len(services)):
145            current_service = services[idx]
146            previous_service = services[idx - 1]
147
148            steps.append(
149                UpgradeStep(
150                    new_service=current_service,
151                    previous_service=previous_service,
152                    upgrade=create_upgrade_action(current_service, previous_service),
153                )
154            )
155        return steps

Returns a list of upgrade step actions from the second service onward.

Each step is a closure that, when called, will perform the upgrade step to the corresponding service.

def initialize( self) -> tuple[materialize.mzcompose.services.materialized.Materialized, list[UpgradeStep]]:
157    def initialize(self) -> tuple[Materialized, list[UpgradeStep]]:
158        """
159        Initialize the with the first service. Returns a list where
160        each step is a closure that, when called, will perform the upgrade step to the corresponding service.
161        """
162        first_service = self.materialized_services[0]
163        with self.c.override(first_service):
164            print(f"Bringing up {first_service.name}")
165            self.c.up(first_service.name)
166
167        return first_service, self.create_upgrade_steps_list()

Initialize the with the first service. Returns a list where each step is a closure that, when called, will perform the upgrade step to the corresponding service.

def print_upgrade_path(self) -> None:
169    def print_upgrade_path(self) -> None:
170        """
171        Print the upgrade steps.
172        """
173
174        def image_to_string(image: str | None) -> str:
175            return "current" if image is None else image.split(":")[-1]
176
177        print(
178            f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}"
179        )

Print the upgrade steps.

def cleanup(self) -> None:
181    def cleanup(self) -> None:
182        """
183        Cleanup after upgrade.
184        """
185        print("Cleaning up upgrade path")
186        # Ensure all services are killed and removed
187        self.c.kill(
188            *[service.name for service in self.materialized_services], wait=True
189        )
190        self.c.rm(
191            *[service.name for service in self.materialized_services],
192            destroy_volumes=True,
193        )
194        self.c.kill(METADATA_STORE, wait=True)
195        self.c.rm(
196            METADATA_STORE,
197            destroy_volumes=True,
198        )

Cleanup after upgrade.