misc.python.materialize.zippy.mz_actions

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10
 11import os
 12
 13from materialize.mzcompose.composition import Composition
 14from materialize.mzcompose.services.materialized import (
 15    LEADER_STATUS_HEALTHCHECK,
 16    DeploymentStatus,
 17    Materialized,
 18)
 19from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
 20from materialize.zippy.blob_store_capabilities import BlobStoreIsRunning
 21from materialize.zippy.crdb_capabilities import CockroachIsRunning
 22from materialize.zippy.framework import (
 23    Action,
 24    ActionFactory,
 25    Capabilities,
 26    Capability,
 27    Mz0dtDeployBaseAction,
 28    State,
 29)
 30from materialize.zippy.mz_capabilities import MzIsRunning
 31from materialize.zippy.view_capabilities import ViewExists
 32
 33
 34class MzStartParameterized(ActionFactory):
 35    """Starts a Mz instance with custom paramters."""
 36
 37    @classmethod
 38    def requires(cls) -> set[type[Capability]]:
 39        return {CockroachIsRunning, BlobStoreIsRunning}
 40
 41    @classmethod
 42    def incompatible_with(cls) -> set[type[Capability]]:
 43        return {MzIsRunning}
 44
 45    def __init__(
 46        self, additional_system_parameter_defaults: dict[str, str] = {}
 47    ) -> None:
 48        self.additional_system_parameter_defaults = additional_system_parameter_defaults
 49
 50    def new(self, capabilities: Capabilities) -> list[Action]:
 51        return [
 52            MzStart(
 53                capabilities=capabilities,
 54                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
 55            )
 56        ]
 57
 58
 59class MzStart(Action):
 60    """Starts a Mz instance (all components are running in the same container)."""
 61
 62    @classmethod
 63    def requires(cls) -> set[type[Capability]]:
 64        return {CockroachIsRunning, BlobStoreIsRunning}
 65
 66    @classmethod
 67    def incompatible_with(cls) -> set[type[Capability]]:
 68        return {MzIsRunning}
 69
 70    def __init__(
 71        self,
 72        capabilities: Capabilities,
 73        additional_system_parameter_defaults: dict[str, str] = {},
 74    ) -> None:
 75        if additional_system_parameter_defaults:
 76            self.additional_system_parameter_defaults = (
 77                additional_system_parameter_defaults
 78            )
 79        else:
 80            self.additional_system_parameter_defaults = {}
 81            system_parameter_default = os.getenv("CI_MZ_SYSTEM_PARAMETER_DEFAULT", "")
 82            if system_parameter_default:
 83                for val in system_parameter_default.split(";"):
 84                    x = val.split("=", maxsplit=1)
 85                    assert (
 86                        len(x) == 2
 87                    ), f"CI_MZ_SYSTEM_PARAMETER_DEFAULT '{val}' should be the format <key>=<val>"
 88                    self.additional_system_parameter_defaults[x[0]] = x[1]
 89
 90        super().__init__(capabilities)
 91
 92    def run(self, c: Composition, state: State) -> None:
 93        print(
 94            f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}"
 95        )
 96
 97        with c.override(
 98            Materialized(
 99                name=state.mz_service,
100                external_blob_store=True,
101                blob_store_is_azure=c.blob_store() == "azurite",
102                external_metadata_store=True,
103                deploy_generation=state.deploy_generation,
104                system_parameter_defaults=state.system_parameter_defaults,
105                sanity_restart=False,
106                restart="on-failure",
107                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
108                metadata_store="cockroach",
109                default_replication_factor=2,
110            )
111        ):
112            c.up(state.mz_service)
113
114        for config_param in [
115            "max_tables",
116            "max_sources",
117            "max_objects_per_schema",
118            "max_materialized_views",
119            "max_sinks",
120        ]:
121            c.sql(
122                f"ALTER SYSTEM SET {config_param} TO 1000",
123                user="mz_system",
124                port=6877,
125                print_statement=False,
126                service=state.mz_service,
127            )
128
129        c.sql(
130            """
131            ALTER CLUSTER quickstart SET (MANAGED = false);
132            """,
133            user="mz_system",
134            port=6877,
135            service=state.mz_service,
136        )
137
138        # Make sure all eligible LIMIT queries use the PeekPersist optimization
139        c.sql(
140            "ALTER SYSTEM SET persist_fast_path_limit = 1000000000",
141            user="mz_system",
142            port=6877,
143            service=state.mz_service,
144        )
145
146    def provides(self) -> list[Capability]:
147        return [MzIsRunning()]
148
149
150class MzStop(Action):
151    """Stops the entire Mz instance (all components are running in the same container)."""
152
153    @classmethod
154    def requires(cls) -> set[type[Capability]]:
155        # Technically speaking, we do not need balancerd to be up in order to kill Mz
156        # However, without this protection we frequently end up in a situation where
157        # both are down and Zippy enters a prolonged period of restarting one or the
158        # other and no other useful work can be performed in the meantime.
159        return {MzIsRunning, BalancerdIsRunning}
160
161    def run(self, c: Composition, state: State) -> None:
162        c.kill(state.mz_service)
163
164    def withholds(self) -> set[type[Capability]]:
165        return {MzIsRunning}
166
167
168class MzRestart(Action):
169    """Restarts the entire Mz instance (all components are running in the same container)."""
170
171    @classmethod
172    def requires(cls) -> set[type[Capability]]:
173        return {MzIsRunning}
174
175    def run(self, c: Composition, state: State) -> None:
176        with c.override(
177            Materialized(
178                name=state.mz_service,
179                external_blob_store=True,
180                blob_store_is_azure=c.blob_store() == "azurite",
181                external_metadata_store=True,
182                deploy_generation=state.deploy_generation,
183                system_parameter_defaults=state.system_parameter_defaults,
184                sanity_restart=False,
185                restart="on-failure",
186                metadata_store="cockroach",
187                default_replication_factor=2,
188            )
189        ):
190            c.kill(state.mz_service)
191            c.up(state.mz_service)
192
193
194class Mz0dtDeploy(Mz0dtDeployBaseAction):
195    """Switches Mz to a new deployment using 0dt."""
196
197    @classmethod
198    def requires(cls) -> set[type[Capability]]:
199        return {MzIsRunning}
200
201    def run(self, c: Composition, state: State) -> None:
202        state.deploy_generation += 1
203
204        state.mz_service = (
205            "materialized" if state.deploy_generation % 2 == 0 else "materialized2"
206        )
207
208        print(f"Deploying generation {state.deploy_generation} on {state.mz_service}")
209
210        with c.override(
211            Materialized(
212                name=state.mz_service,
213                external_blob_store=True,
214                blob_store_is_azure=c.blob_store() == "azurite",
215                external_metadata_store=True,
216                deploy_generation=state.deploy_generation,
217                system_parameter_defaults=state.system_parameter_defaults,
218                sanity_restart=False,
219                restart="on-failure",
220                healthcheck=LEADER_STATUS_HEALTHCHECK,
221                metadata_store="cockroach",
222                default_replication_factor=2,
223            ),
224        ):
225            c.up(state.mz_service, detach=True)
226            c.await_mz_deployment_status(
227                DeploymentStatus.READY_TO_PROMOTE, state.mz_service
228            )
229            c.promote_mz(state.mz_service)
230            c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, state.mz_service)
231            c.stop(
232                (
233                    "materialized2"
234                    if state.mz_service == "materialized"
235                    else "materialized"
236                ),
237                wait=True,
238            )
239
240
241class KillClusterd(Action):
242    """Kills the clusterd processes in the environmentd container. The process orchestrator will restart them."""
243
244    @classmethod
245    def requires(cls) -> set[type[Capability]]:
246        return {MzIsRunning, ViewExists}
247
248    def run(self, c: Composition, state: State) -> None:
249        # Depending on the workload, clusterd may not be running, hence the || true
250        c.exec(state.mz_service, "bash", "-c", "kill -9 `pidof clusterd` || true")
class MzStartParameterized(materialize.zippy.framework.ActionFactory):
35class MzStartParameterized(ActionFactory):
36    """Starts a Mz instance with custom paramters."""
37
38    @classmethod
39    def requires(cls) -> set[type[Capability]]:
40        return {CockroachIsRunning, BlobStoreIsRunning}
41
42    @classmethod
43    def incompatible_with(cls) -> set[type[Capability]]:
44        return {MzIsRunning}
45
46    def __init__(
47        self, additional_system_parameter_defaults: dict[str, str] = {}
48    ) -> None:
49        self.additional_system_parameter_defaults = additional_system_parameter_defaults
50
51    def new(self, capabilities: Capabilities) -> list[Action]:
52        return [
53            MzStart(
54                capabilities=capabilities,
55                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
56            )
57        ]

Starts a Mz instance with custom paramters.

MzStartParameterized(additional_system_parameter_defaults: dict[str, str] = {})
46    def __init__(
47        self, additional_system_parameter_defaults: dict[str, str] = {}
48    ) -> None:
49        self.additional_system_parameter_defaults = additional_system_parameter_defaults
@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
38    @classmethod
39    def requires(cls) -> set[type[Capability]]:
40        return {CockroachIsRunning, BlobStoreIsRunning}

Compute the capability classes that this Action Factory requires.

@classmethod
def incompatible_with(cls) -> set[type[materialize.zippy.framework.Capability]]:
42    @classmethod
43    def incompatible_with(cls) -> set[type[Capability]]:
44        return {MzIsRunning}

The capability classes that this action is not compatible with.

additional_system_parameter_defaults
def new( self, capabilities: materialize.zippy.framework.Capabilities) -> list[materialize.zippy.framework.Action]:
51    def new(self, capabilities: Capabilities) -> list[Action]:
52        return [
53            MzStart(
54                capabilities=capabilities,
55                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
56            )
57        ]
class MzStart(materialize.zippy.framework.Action):
 60class MzStart(Action):
 61    """Starts a Mz instance (all components are running in the same container)."""
 62
 63    @classmethod
 64    def requires(cls) -> set[type[Capability]]:
 65        return {CockroachIsRunning, BlobStoreIsRunning}
 66
 67    @classmethod
 68    def incompatible_with(cls) -> set[type[Capability]]:
 69        return {MzIsRunning}
 70
 71    def __init__(
 72        self,
 73        capabilities: Capabilities,
 74        additional_system_parameter_defaults: dict[str, str] = {},
 75    ) -> None:
 76        if additional_system_parameter_defaults:
 77            self.additional_system_parameter_defaults = (
 78                additional_system_parameter_defaults
 79            )
 80        else:
 81            self.additional_system_parameter_defaults = {}
 82            system_parameter_default = os.getenv("CI_MZ_SYSTEM_PARAMETER_DEFAULT", "")
 83            if system_parameter_default:
 84                for val in system_parameter_default.split(";"):
 85                    x = val.split("=", maxsplit=1)
 86                    assert (
 87                        len(x) == 2
 88                    ), f"CI_MZ_SYSTEM_PARAMETER_DEFAULT '{val}' should be the format <key>=<val>"
 89                    self.additional_system_parameter_defaults[x[0]] = x[1]
 90
 91        super().__init__(capabilities)
 92
 93    def run(self, c: Composition, state: State) -> None:
 94        print(
 95            f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}"
 96        )
 97
 98        with c.override(
 99            Materialized(
100                name=state.mz_service,
101                external_blob_store=True,
102                blob_store_is_azure=c.blob_store() == "azurite",
103                external_metadata_store=True,
104                deploy_generation=state.deploy_generation,
105                system_parameter_defaults=state.system_parameter_defaults,
106                sanity_restart=False,
107                restart="on-failure",
108                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
109                metadata_store="cockroach",
110                default_replication_factor=2,
111            )
112        ):
113            c.up(state.mz_service)
114
115        for config_param in [
116            "max_tables",
117            "max_sources",
118            "max_objects_per_schema",
119            "max_materialized_views",
120            "max_sinks",
121        ]:
122            c.sql(
123                f"ALTER SYSTEM SET {config_param} TO 1000",
124                user="mz_system",
125                port=6877,
126                print_statement=False,
127                service=state.mz_service,
128            )
129
130        c.sql(
131            """
132            ALTER CLUSTER quickstart SET (MANAGED = false);
133            """,
134            user="mz_system",
135            port=6877,
136            service=state.mz_service,
137        )
138
139        # Make sure all eligible LIMIT queries use the PeekPersist optimization
140        c.sql(
141            "ALTER SYSTEM SET persist_fast_path_limit = 1000000000",
142            user="mz_system",
143            port=6877,
144            service=state.mz_service,
145        )
146
147    def provides(self) -> list[Capability]:
148        return [MzIsRunning()]

Starts a Mz instance (all components are running in the same container).

MzStart( capabilities: materialize.zippy.framework.Capabilities, additional_system_parameter_defaults: dict[str, str] = {})
71    def __init__(
72        self,
73        capabilities: Capabilities,
74        additional_system_parameter_defaults: dict[str, str] = {},
75    ) -> None:
76        if additional_system_parameter_defaults:
77            self.additional_system_parameter_defaults = (
78                additional_system_parameter_defaults
79            )
80        else:
81            self.additional_system_parameter_defaults = {}
82            system_parameter_default = os.getenv("CI_MZ_SYSTEM_PARAMETER_DEFAULT", "")
83            if system_parameter_default:
84                for val in system_parameter_default.split(";"):
85                    x = val.split("=", maxsplit=1)
86                    assert (
87                        len(x) == 2
88                    ), f"CI_MZ_SYSTEM_PARAMETER_DEFAULT '{val}' should be the format <key>=<val>"
89                    self.additional_system_parameter_defaults[x[0]] = x[1]
90
91        super().__init__(capabilities)

Construct a new action, possibly conditioning on the available capabilities.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
63    @classmethod
64    def requires(cls) -> set[type[Capability]]:
65        return {CockroachIsRunning, BlobStoreIsRunning}

Compute the capability classes that this action requires.

@classmethod
def incompatible_with(cls) -> set[type[materialize.zippy.framework.Capability]]:
67    @classmethod
68    def incompatible_with(cls) -> set[type[Capability]]:
69        return {MzIsRunning}

The capability classes that this action is not compatible with.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
 93    def run(self, c: Composition, state: State) -> None:
 94        print(
 95            f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}"
 96        )
 97
 98        with c.override(
 99            Materialized(
100                name=state.mz_service,
101                external_blob_store=True,
102                blob_store_is_azure=c.blob_store() == "azurite",
103                external_metadata_store=True,
104                deploy_generation=state.deploy_generation,
105                system_parameter_defaults=state.system_parameter_defaults,
106                sanity_restart=False,
107                restart="on-failure",
108                additional_system_parameter_defaults=self.additional_system_parameter_defaults,
109                metadata_store="cockroach",
110                default_replication_factor=2,
111            )
112        ):
113            c.up(state.mz_service)
114
115        for config_param in [
116            "max_tables",
117            "max_sources",
118            "max_objects_per_schema",
119            "max_materialized_views",
120            "max_sinks",
121        ]:
122            c.sql(
123                f"ALTER SYSTEM SET {config_param} TO 1000",
124                user="mz_system",
125                port=6877,
126                print_statement=False,
127                service=state.mz_service,
128            )
129
130        c.sql(
131            """
132            ALTER CLUSTER quickstart SET (MANAGED = false);
133            """,
134            user="mz_system",
135            port=6877,
136            service=state.mz_service,
137        )
138
139        # Make sure all eligible LIMIT queries use the PeekPersist optimization
140        c.sql(
141            "ALTER SYSTEM SET persist_fast_path_limit = 1000000000",
142            user="mz_system",
143            port=6877,
144            service=state.mz_service,
145        )

Run this action on the provided composition.

def provides(self) -> list[materialize.zippy.framework.Capability]:
147    def provides(self) -> list[Capability]:
148        return [MzIsRunning()]

Compute the capabilities that this action will make available.

class MzStop(materialize.zippy.framework.Action):
151class MzStop(Action):
152    """Stops the entire Mz instance (all components are running in the same container)."""
153
154    @classmethod
155    def requires(cls) -> set[type[Capability]]:
156        # Technically speaking, we do not need balancerd to be up in order to kill Mz
157        # However, without this protection we frequently end up in a situation where
158        # both are down and Zippy enters a prolonged period of restarting one or the
159        # other and no other useful work can be performed in the meantime.
160        return {MzIsRunning, BalancerdIsRunning}
161
162    def run(self, c: Composition, state: State) -> None:
163        c.kill(state.mz_service)
164
165    def withholds(self) -> set[type[Capability]]:
166        return {MzIsRunning}

Stops the entire Mz instance (all components are running in the same container).

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
154    @classmethod
155    def requires(cls) -> set[type[Capability]]:
156        # Technically speaking, we do not need balancerd to be up in order to kill Mz
157        # However, without this protection we frequently end up in a situation where
158        # both are down and Zippy enters a prolonged period of restarting one or the
159        # other and no other useful work can be performed in the meantime.
160        return {MzIsRunning, BalancerdIsRunning}

Compute the capability classes that this action requires.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
162    def run(self, c: Composition, state: State) -> None:
163        c.kill(state.mz_service)

Run this action on the provided composition.

def withholds(self) -> set[type[materialize.zippy.framework.Capability]]:
165    def withholds(self) -> set[type[Capability]]:
166        return {MzIsRunning}

Compute the capability classes that this action will make unavailable.

class MzRestart(materialize.zippy.framework.Action):
169class MzRestart(Action):
170    """Restarts the entire Mz instance (all components are running in the same container)."""
171
172    @classmethod
173    def requires(cls) -> set[type[Capability]]:
174        return {MzIsRunning}
175
176    def run(self, c: Composition, state: State) -> None:
177        with c.override(
178            Materialized(
179                name=state.mz_service,
180                external_blob_store=True,
181                blob_store_is_azure=c.blob_store() == "azurite",
182                external_metadata_store=True,
183                deploy_generation=state.deploy_generation,
184                system_parameter_defaults=state.system_parameter_defaults,
185                sanity_restart=False,
186                restart="on-failure",
187                metadata_store="cockroach",
188                default_replication_factor=2,
189            )
190        ):
191            c.kill(state.mz_service)
192            c.up(state.mz_service)

Restarts the entire Mz instance (all components are running in the same container).

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
172    @classmethod
173    def requires(cls) -> set[type[Capability]]:
174        return {MzIsRunning}

Compute the capability classes that this action requires.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
176    def run(self, c: Composition, state: State) -> None:
177        with c.override(
178            Materialized(
179                name=state.mz_service,
180                external_blob_store=True,
181                blob_store_is_azure=c.blob_store() == "azurite",
182                external_metadata_store=True,
183                deploy_generation=state.deploy_generation,
184                system_parameter_defaults=state.system_parameter_defaults,
185                sanity_restart=False,
186                restart="on-failure",
187                metadata_store="cockroach",
188                default_replication_factor=2,
189            )
190        ):
191            c.kill(state.mz_service)
192            c.up(state.mz_service)

Run this action on the provided composition.

class Mz0dtDeploy(materialize.zippy.framework.Mz0dtDeployBaseAction):
195class Mz0dtDeploy(Mz0dtDeployBaseAction):
196    """Switches Mz to a new deployment using 0dt."""
197
198    @classmethod
199    def requires(cls) -> set[type[Capability]]:
200        return {MzIsRunning}
201
202    def run(self, c: Composition, state: State) -> None:
203        state.deploy_generation += 1
204
205        state.mz_service = (
206            "materialized" if state.deploy_generation % 2 == 0 else "materialized2"
207        )
208
209        print(f"Deploying generation {state.deploy_generation} on {state.mz_service}")
210
211        with c.override(
212            Materialized(
213                name=state.mz_service,
214                external_blob_store=True,
215                blob_store_is_azure=c.blob_store() == "azurite",
216                external_metadata_store=True,
217                deploy_generation=state.deploy_generation,
218                system_parameter_defaults=state.system_parameter_defaults,
219                sanity_restart=False,
220                restart="on-failure",
221                healthcheck=LEADER_STATUS_HEALTHCHECK,
222                metadata_store="cockroach",
223                default_replication_factor=2,
224            ),
225        ):
226            c.up(state.mz_service, detach=True)
227            c.await_mz_deployment_status(
228                DeploymentStatus.READY_TO_PROMOTE, state.mz_service
229            )
230            c.promote_mz(state.mz_service)
231            c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, state.mz_service)
232            c.stop(
233                (
234                    "materialized2"
235                    if state.mz_service == "materialized"
236                    else "materialized"
237                ),
238                wait=True,
239            )

Switches Mz to a new deployment using 0dt.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
198    @classmethod
199    def requires(cls) -> set[type[Capability]]:
200        return {MzIsRunning}

Compute the capability classes that this action requires.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
202    def run(self, c: Composition, state: State) -> None:
203        state.deploy_generation += 1
204
205        state.mz_service = (
206            "materialized" if state.deploy_generation % 2 == 0 else "materialized2"
207        )
208
209        print(f"Deploying generation {state.deploy_generation} on {state.mz_service}")
210
211        with c.override(
212            Materialized(
213                name=state.mz_service,
214                external_blob_store=True,
215                blob_store_is_azure=c.blob_store() == "azurite",
216                external_metadata_store=True,
217                deploy_generation=state.deploy_generation,
218                system_parameter_defaults=state.system_parameter_defaults,
219                sanity_restart=False,
220                restart="on-failure",
221                healthcheck=LEADER_STATUS_HEALTHCHECK,
222                metadata_store="cockroach",
223                default_replication_factor=2,
224            ),
225        ):
226            c.up(state.mz_service, detach=True)
227            c.await_mz_deployment_status(
228                DeploymentStatus.READY_TO_PROMOTE, state.mz_service
229            )
230            c.promote_mz(state.mz_service)
231            c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, state.mz_service)
232            c.stop(
233                (
234                    "materialized2"
235                    if state.mz_service == "materialized"
236                    else "materialized"
237                ),
238                wait=True,
239            )

Run this action on the provided composition.

class KillClusterd(materialize.zippy.framework.Action):
242class KillClusterd(Action):
243    """Kills the clusterd processes in the environmentd container. The process orchestrator will restart them."""
244
245    @classmethod
246    def requires(cls) -> set[type[Capability]]:
247        return {MzIsRunning, ViewExists}
248
249    def run(self, c: Composition, state: State) -> None:
250        # Depending on the workload, clusterd may not be running, hence the || true
251        c.exec(state.mz_service, "bash", "-c", "kill -9 `pidof clusterd` || true")

Kills the clusterd processes in the environmentd container. The process orchestrator will restart them.

@classmethod
def requires(cls) -> set[type[materialize.zippy.framework.Capability]]:
245    @classmethod
246    def requires(cls) -> set[type[Capability]]:
247        return {MzIsRunning, ViewExists}

Compute the capability classes that this action requires.

def run( self, c: materialize.mzcompose.composition.Composition, state: materialize.zippy.framework.State) -> None:
249    def run(self, c: Composition, state: State) -> None:
250        # Depending on the workload, clusterd may not be running, hence the || true
251        c.exec(state.mz_service, "bash", "-c", "kill -9 `pidof clusterd` || true")

Run this action on the provided composition.