misc.python.materialize.checks.cloudtest_actions
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10from textwrap import dedent 11from typing import Any 12 13from materialize.checks.actions import Action 14from materialize.checks.executors import Executor 15from materialize.cloudtest.app.materialize_application import MaterializeApplication 16from materialize.cloudtest.k8s.environmentd import EnvironmentdStatefulSet 17from materialize.mz_version import MzVersion 18 19 20class ReplaceEnvironmentdStatefulSet(Action): 21 """Change the image tag of the environmentd stateful set, re-create the definition and replace the existing one.""" 22 23 new_tag: str | None 24 25 def __init__(self, new_tag: str | None = None) -> None: 26 self.new_tag = new_tag 27 28 def execute(self, e: Executor) -> None: 29 new_version = ( 30 MzVersion.parse_mz(self.new_tag) 31 if self.new_tag 32 else MzVersion.parse_cargo() 33 ) 34 print( 35 f"Replacing environmentd stateful set from version {e.current_mz_version} to version {new_version}" 36 ) 37 mz = e.cloudtest_application() 38 stateful_set = [ 39 resource 40 for resource in mz.resources 41 if type(resource) == EnvironmentdStatefulSet 42 ] 43 assert len(stateful_set) == 1 44 stateful_set = stateful_set[0] 45 46 stateful_set.tag = self.new_tag 47 stateful_set.replace() 48 e.current_mz_version = new_version 49 50 def join(self, e: Executor) -> None: 51 # execute is blocking already 52 pass 53 54 55class SetupSshTunnels(Action): 56 """Prepare the SSH tunnels.""" 57 58 def __init__(self, mz: MaterializeApplication) -> None: 59 self.handle: Any | None = None 60 self.mz = mz 61 62 def execute(self, e: Executor) -> None: 63 connection_count = 4 64 self.handle = e.testdrive( 65 "\n".join( 66 [ 67 dedent( 68 f""" 69 > CREATE CONNECTION IF NOT EXISTS ssh_tunnel_{i} TO SSH TUNNEL ( 70 HOST 'ssh-bastion-host', 71 USER 'mz', 72 PORT 22 73 ); 74 """ 75 ) 76 for i in range(connection_count) 77 ] 78 ) 79 ) 80 81 for i in range(connection_count): 82 public_key = self.mz.environmentd.sql_query( 83 "SELECT public_key_1 FROM mz_ssh_tunnel_connections ssh" 84 " JOIN mz_connections c ON c.id = ssh.id" 85 f" WHERE c.name = 'ssh_tunnel_{i}';" 86 )[0][0] 87 88 # Add public key to SSH bastion host 89 self.mz.kubectl( 90 "exec", 91 "svc/ssh-bastion-host", 92 "--", 93 "bash", 94 "-c", 95 f"echo '{public_key}' >> /etc/authorized_keys/mz", 96 ) 97 98 def join(self, e: Executor) -> None: 99 e.join(self.handle)
class
ReplaceEnvironmentdStatefulSet(materialize.checks.actions.Action):
21class ReplaceEnvironmentdStatefulSet(Action): 22 """Change the image tag of the environmentd stateful set, re-create the definition and replace the existing one.""" 23 24 new_tag: str | None 25 26 def __init__(self, new_tag: str | None = None) -> None: 27 self.new_tag = new_tag 28 29 def execute(self, e: Executor) -> None: 30 new_version = ( 31 MzVersion.parse_mz(self.new_tag) 32 if self.new_tag 33 else MzVersion.parse_cargo() 34 ) 35 print( 36 f"Replacing environmentd stateful set from version {e.current_mz_version} to version {new_version}" 37 ) 38 mz = e.cloudtest_application() 39 stateful_set = [ 40 resource 41 for resource in mz.resources 42 if type(resource) == EnvironmentdStatefulSet 43 ] 44 assert len(stateful_set) == 1 45 stateful_set = stateful_set[0] 46 47 stateful_set.tag = self.new_tag 48 stateful_set.replace() 49 e.current_mz_version = new_version 50 51 def join(self, e: Executor) -> None: 52 # execute is blocking already 53 pass
Change the image tag of the environmentd stateful set, re-create the definition and replace the existing one.
def
execute(self, e: materialize.checks.executors.Executor) -> None:
29 def execute(self, e: Executor) -> None: 30 new_version = ( 31 MzVersion.parse_mz(self.new_tag) 32 if self.new_tag 33 else MzVersion.parse_cargo() 34 ) 35 print( 36 f"Replacing environmentd stateful set from version {e.current_mz_version} to version {new_version}" 37 ) 38 mz = e.cloudtest_application() 39 stateful_set = [ 40 resource 41 for resource in mz.resources 42 if type(resource) == EnvironmentdStatefulSet 43 ] 44 assert len(stateful_set) == 1 45 stateful_set = stateful_set[0] 46 47 stateful_set.tag = self.new_tag 48 stateful_set.replace() 49 e.current_mz_version = new_version
class
SetupSshTunnels(materialize.checks.actions.Action):
56class SetupSshTunnels(Action): 57 """Prepare the SSH tunnels.""" 58 59 def __init__(self, mz: MaterializeApplication) -> None: 60 self.handle: Any | None = None 61 self.mz = mz 62 63 def execute(self, e: Executor) -> None: 64 connection_count = 4 65 self.handle = e.testdrive( 66 "\n".join( 67 [ 68 dedent( 69 f""" 70 > CREATE CONNECTION IF NOT EXISTS ssh_tunnel_{i} TO SSH TUNNEL ( 71 HOST 'ssh-bastion-host', 72 USER 'mz', 73 PORT 22 74 ); 75 """ 76 ) 77 for i in range(connection_count) 78 ] 79 ) 80 ) 81 82 for i in range(connection_count): 83 public_key = self.mz.environmentd.sql_query( 84 "SELECT public_key_1 FROM mz_ssh_tunnel_connections ssh" 85 " JOIN mz_connections c ON c.id = ssh.id" 86 f" WHERE c.name = 'ssh_tunnel_{i}';" 87 )[0][0] 88 89 # Add public key to SSH bastion host 90 self.mz.kubectl( 91 "exec", 92 "svc/ssh-bastion-host", 93 "--", 94 "bash", 95 "-c", 96 f"echo '{public_key}' >> /etc/authorized_keys/mz", 97 ) 98 99 def join(self, e: Executor) -> None: 100 e.join(self.handle)
Prepare the SSH tunnels.
def
execute(self, e: materialize.checks.executors.Executor) -> None:
63 def execute(self, e: Executor) -> None: 64 connection_count = 4 65 self.handle = e.testdrive( 66 "\n".join( 67 [ 68 dedent( 69 f""" 70 > CREATE CONNECTION IF NOT EXISTS ssh_tunnel_{i} TO SSH TUNNEL ( 71 HOST 'ssh-bastion-host', 72 USER 'mz', 73 PORT 22 74 ); 75 """ 76 ) 77 for i in range(connection_count) 78 ] 79 ) 80 ) 81 82 for i in range(connection_count): 83 public_key = self.mz.environmentd.sql_query( 84 "SELECT public_key_1 FROM mz_ssh_tunnel_connections ssh" 85 " JOIN mz_connections c ON c.id = ssh.id" 86 f" WHERE c.name = 'ssh_tunnel_{i}';" 87 )[0][0] 88 89 # Add public key to SSH bastion host 90 self.mz.kubectl( 91 "exec", 92 "svc/ssh-bastion-host", 93 "--", 94 "bash", 95 "-c", 96 f"echo '{public_key}' >> /etc/authorized_keys/mz", 97 )