misc.python.materialize.checks.cloudtest_actions

 1# Copyright Materialize, Inc. and contributors. All rights reserved.
 2#
 3# Use of this software is governed by the Business Source License
 4# included in the LICENSE file at the root of this repository.
 5#
 6# As of the Change Date specified in that file, in accordance with
 7# the Business Source License, use of this software will be governed
 8# by the Apache License, Version 2.0.
 9
10from textwrap import dedent
11from typing import Any
12
13from materialize.checks.actions import Action
14from materialize.checks.executors import Executor
15from materialize.cloudtest.app.materialize_application import MaterializeApplication
16from materialize.cloudtest.k8s.environmentd import EnvironmentdStatefulSet
17from materialize.mz_version import MzVersion
18
19
20class ReplaceEnvironmentdStatefulSet(Action):
21    """Change the image tag of the environmentd stateful set, re-create the definition and replace the existing one."""
22
23    new_tag: str | None
24
25    def __init__(self, new_tag: str | None = None) -> None:
26        self.new_tag = new_tag
27
28    def execute(self, e: Executor) -> None:
29        new_version = (
30            MzVersion.parse_mz(self.new_tag)
31            if self.new_tag
32            else MzVersion.parse_cargo()
33        )
34        print(
35            f"Replacing environmentd stateful set from version {e.current_mz_version} to version {new_version}"
36        )
37        mz = e.cloudtest_application()
38        stateful_set = [
39            resource
40            for resource in mz.resources
41            if type(resource) == EnvironmentdStatefulSet
42        ]
43        assert len(stateful_set) == 1
44        stateful_set = stateful_set[0]
45
46        stateful_set.tag = self.new_tag
47        stateful_set.replace()
48        e.current_mz_version = new_version
49
50    def join(self, e: Executor) -> None:
51        # execute is blocking already
52        pass
53
54
55class SetupSshTunnels(Action):
56    """Prepare the SSH tunnels."""
57
58    def __init__(self, mz: MaterializeApplication) -> None:
59        self.handle: Any | None = None
60        self.mz = mz
61
62    def execute(self, e: Executor) -> None:
63        connection_count = 4
64        self.handle = e.testdrive(
65            "\n".join(
66                [
67                    dedent(
68                        f"""
69                        > CREATE CONNECTION IF NOT EXISTS ssh_tunnel_{i} TO SSH TUNNEL (
70                            HOST 'ssh-bastion-host',
71                            USER 'mz',
72                            PORT 22
73                            );
74                        """
75                    )
76                    for i in range(connection_count)
77                ]
78            )
79        )
80
81        for i in range(connection_count):
82            public_key = self.mz.environmentd.sql_query(
83                "SELECT public_key_1 FROM mz_ssh_tunnel_connections ssh"
84                " JOIN mz_connections c ON c.id = ssh.id"
85                f" WHERE c.name = 'ssh_tunnel_{i}';"
86            )[0][0]
87
88            # Add public key to SSH bastion host
89            self.mz.kubectl(
90                "exec",
91                "svc/ssh-bastion-host",
92                "--",
93                "bash",
94                "-c",
95                f"echo '{public_key}' >> /etc/authorized_keys/mz",
96            )
97
98    def join(self, e: Executor) -> None:
99        e.join(self.handle)
class ReplaceEnvironmentdStatefulSet(materialize.checks.actions.Action):
21class ReplaceEnvironmentdStatefulSet(Action):
22    """Change the image tag of the environmentd stateful set, re-create the definition and replace the existing one."""
23
24    new_tag: str | None
25
26    def __init__(self, new_tag: str | None = None) -> None:
27        self.new_tag = new_tag
28
29    def execute(self, e: Executor) -> None:
30        new_version = (
31            MzVersion.parse_mz(self.new_tag)
32            if self.new_tag
33            else MzVersion.parse_cargo()
34        )
35        print(
36            f"Replacing environmentd stateful set from version {e.current_mz_version} to version {new_version}"
37        )
38        mz = e.cloudtest_application()
39        stateful_set = [
40            resource
41            for resource in mz.resources
42            if type(resource) == EnvironmentdStatefulSet
43        ]
44        assert len(stateful_set) == 1
45        stateful_set = stateful_set[0]
46
47        stateful_set.tag = self.new_tag
48        stateful_set.replace()
49        e.current_mz_version = new_version
50
51    def join(self, e: Executor) -> None:
52        # execute is blocking already
53        pass

Change the image tag of the environmentd stateful set, re-create the definition and replace the existing one.

ReplaceEnvironmentdStatefulSet(new_tag: str | None = None)
26    def __init__(self, new_tag: str | None = None) -> None:
27        self.new_tag = new_tag
new_tag: str | None
def execute(self, e: materialize.checks.executors.Executor) -> None:
29    def execute(self, e: Executor) -> None:
30        new_version = (
31            MzVersion.parse_mz(self.new_tag)
32            if self.new_tag
33            else MzVersion.parse_cargo()
34        )
35        print(
36            f"Replacing environmentd stateful set from version {e.current_mz_version} to version {new_version}"
37        )
38        mz = e.cloudtest_application()
39        stateful_set = [
40            resource
41            for resource in mz.resources
42            if type(resource) == EnvironmentdStatefulSet
43        ]
44        assert len(stateful_set) == 1
45        stateful_set = stateful_set[0]
46
47        stateful_set.tag = self.new_tag
48        stateful_set.replace()
49        e.current_mz_version = new_version
def join(self, e: materialize.checks.executors.Executor) -> None:
51    def join(self, e: Executor) -> None:
52        # execute is blocking already
53        pass
class SetupSshTunnels(materialize.checks.actions.Action):
 56class SetupSshTunnels(Action):
 57    """Prepare the SSH tunnels."""
 58
 59    def __init__(self, mz: MaterializeApplication) -> None:
 60        self.handle: Any | None = None
 61        self.mz = mz
 62
 63    def execute(self, e: Executor) -> None:
 64        connection_count = 4
 65        self.handle = e.testdrive(
 66            "\n".join(
 67                [
 68                    dedent(
 69                        f"""
 70                        > CREATE CONNECTION IF NOT EXISTS ssh_tunnel_{i} TO SSH TUNNEL (
 71                            HOST 'ssh-bastion-host',
 72                            USER 'mz',
 73                            PORT 22
 74                            );
 75                        """
 76                    )
 77                    for i in range(connection_count)
 78                ]
 79            )
 80        )
 81
 82        for i in range(connection_count):
 83            public_key = self.mz.environmentd.sql_query(
 84                "SELECT public_key_1 FROM mz_ssh_tunnel_connections ssh"
 85                " JOIN mz_connections c ON c.id = ssh.id"
 86                f" WHERE c.name = 'ssh_tunnel_{i}';"
 87            )[0][0]
 88
 89            # Add public key to SSH bastion host
 90            self.mz.kubectl(
 91                "exec",
 92                "svc/ssh-bastion-host",
 93                "--",
 94                "bash",
 95                "-c",
 96                f"echo '{public_key}' >> /etc/authorized_keys/mz",
 97            )
 98
 99    def join(self, e: Executor) -> None:
100        e.join(self.handle)

Prepare the SSH tunnels.

SetupSshTunnels( mz: materialize.cloudtest.app.materialize_application.MaterializeApplication)
59    def __init__(self, mz: MaterializeApplication) -> None:
60        self.handle: Any | None = None
61        self.mz = mz
handle: typing.Any | None
mz
def execute(self, e: materialize.checks.executors.Executor) -> None:
63    def execute(self, e: Executor) -> None:
64        connection_count = 4
65        self.handle = e.testdrive(
66            "\n".join(
67                [
68                    dedent(
69                        f"""
70                        > CREATE CONNECTION IF NOT EXISTS ssh_tunnel_{i} TO SSH TUNNEL (
71                            HOST 'ssh-bastion-host',
72                            USER 'mz',
73                            PORT 22
74                            );
75                        """
76                    )
77                    for i in range(connection_count)
78                ]
79            )
80        )
81
82        for i in range(connection_count):
83            public_key = self.mz.environmentd.sql_query(
84                "SELECT public_key_1 FROM mz_ssh_tunnel_connections ssh"
85                " JOIN mz_connections c ON c.id = ssh.id"
86                f" WHERE c.name = 'ssh_tunnel_{i}';"
87            )[0][0]
88
89            # Add public key to SSH bastion host
90            self.mz.kubectl(
91                "exec",
92                "svc/ssh-bastion-host",
93                "--",
94                "bash",
95                "-c",
96                f"echo '{public_key}' >> /etc/authorized_keys/mz",
97            )
def join(self, e: materialize.checks.executors.Executor) -> None:
 99    def join(self, e: Executor) -> None:
100        e.join(self.handle)