misc.python.materialize.teleport
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import os 11import subprocess 12import threading 13import time 14from textwrap import dedent 15 16import psutil 17 18from materialize import build_config, ui 19 20 21class TeleportProxy: 22 @classmethod 23 def spawn(cls, app_name: str, port: str): 24 """Spawn a Teleport proxy for the provided app_name.""" 25 26 teleport_state = build_config.TeleportLocalState.read() 27 28 # If there is already a Teleport proxy running, no need to restart one. 29 running_pid = TeleportProxy.check(app_name) 30 if running_pid: 31 ui.say(f"Teleport proxy already running, PID: {running_pid}") 32 return 33 else: 34 # If the existing PID doesn't exist, clear it from state. 35 teleport_state.set_pid(app_name, None) 36 teleport_state.set_address(app_name, None) 37 teleport_state.write() 38 39 # Otherwise spawn a Teleport proxy. 40 cmd_args = ["tsh", "proxy", "app", f"{app_name}", "--port", port] 41 child = subprocess.Popen( 42 cmd_args, 43 stdout=subprocess.DEVNULL, 44 stderr=subprocess.DEVNULL, 45 preexec_fn=os.setpgrp, 46 ) 47 ui.say(f"starting Teleport proxy for '{app_name}'...") 48 49 def wait(child, teleport_state, address): 50 wait_start = time.time() 51 52 while time.time() - wait_start < 2: 53 child_terminated = child.poll() 54 if child_terminated: 55 other_tshs = [ 56 p.pid 57 for p in psutil.process_iter(["pid", "name"]) 58 if p.name() == "tsh" 59 ] 60 ui.warn( 61 dedent( 62 f""" 63 Teleport proxy failed to start, 'tsh' process already running! 64 existing 'tsh' processes: {other_tshs} 65 exit code: {child_terminated} 66 """ 67 ) 68 ) 69 break 70 71 # Timed out! Check if the process is running. 72 child_pid_status = psutil.pid_exists(child.pid) 73 if child_pid_status: 74 # Record the PID, if the process started successfully. 75 teleport_state.set_pid(app_name, child.pid) 76 teleport_state.set_address(app_name, address) 77 teleport_state.write() 78 79 # Spawn a thread that will wait for the Teleport proxy to start, and 80 # record it's PID, or warn that it failed to start. 81 address = f"http://localhost:{port}" 82 thread = threading.Thread(target=wait, args=[child, teleport_state, address]) 83 thread.start() 84 85 @classmethod 86 def check(cls, app_name: str) -> str | None: 87 """Check if a Teleport proxy is already running for the specified app_name.""" 88 89 teleport_state = build_config.TeleportLocalState.read() 90 existing_pid = teleport_state.get_pid(app_name) 91 92 if existing_pid and psutil.pid_exists(int(existing_pid)): 93 return teleport_state.get_pid(app_name) 94 else: 95 return None
class
TeleportProxy:
22class TeleportProxy: 23 @classmethod 24 def spawn(cls, app_name: str, port: str): 25 """Spawn a Teleport proxy for the provided app_name.""" 26 27 teleport_state = build_config.TeleportLocalState.read() 28 29 # If there is already a Teleport proxy running, no need to restart one. 30 running_pid = TeleportProxy.check(app_name) 31 if running_pid: 32 ui.say(f"Teleport proxy already running, PID: {running_pid}") 33 return 34 else: 35 # If the existing PID doesn't exist, clear it from state. 36 teleport_state.set_pid(app_name, None) 37 teleport_state.set_address(app_name, None) 38 teleport_state.write() 39 40 # Otherwise spawn a Teleport proxy. 41 cmd_args = ["tsh", "proxy", "app", f"{app_name}", "--port", port] 42 child = subprocess.Popen( 43 cmd_args, 44 stdout=subprocess.DEVNULL, 45 stderr=subprocess.DEVNULL, 46 preexec_fn=os.setpgrp, 47 ) 48 ui.say(f"starting Teleport proxy for '{app_name}'...") 49 50 def wait(child, teleport_state, address): 51 wait_start = time.time() 52 53 while time.time() - wait_start < 2: 54 child_terminated = child.poll() 55 if child_terminated: 56 other_tshs = [ 57 p.pid 58 for p in psutil.process_iter(["pid", "name"]) 59 if p.name() == "tsh" 60 ] 61 ui.warn( 62 dedent( 63 f""" 64 Teleport proxy failed to start, 'tsh' process already running! 65 existing 'tsh' processes: {other_tshs} 66 exit code: {child_terminated} 67 """ 68 ) 69 ) 70 break 71 72 # Timed out! Check if the process is running. 73 child_pid_status = psutil.pid_exists(child.pid) 74 if child_pid_status: 75 # Record the PID, if the process started successfully. 76 teleport_state.set_pid(app_name, child.pid) 77 teleport_state.set_address(app_name, address) 78 teleport_state.write() 79 80 # Spawn a thread that will wait for the Teleport proxy to start, and 81 # record it's PID, or warn that it failed to start. 82 address = f"http://localhost:{port}" 83 thread = threading.Thread(target=wait, args=[child, teleport_state, address]) 84 thread.start() 85 86 @classmethod 87 def check(cls, app_name: str) -> str | None: 88 """Check if a Teleport proxy is already running for the specified app_name.""" 89 90 teleport_state = build_config.TeleportLocalState.read() 91 existing_pid = teleport_state.get_pid(app_name) 92 93 if existing_pid and psutil.pid_exists(int(existing_pid)): 94 return teleport_state.get_pid(app_name) 95 else: 96 return None
@classmethod
def
spawn(cls, app_name: str, port: str):
23 @classmethod 24 def spawn(cls, app_name: str, port: str): 25 """Spawn a Teleport proxy for the provided app_name.""" 26 27 teleport_state = build_config.TeleportLocalState.read() 28 29 # If there is already a Teleport proxy running, no need to restart one. 30 running_pid = TeleportProxy.check(app_name) 31 if running_pid: 32 ui.say(f"Teleport proxy already running, PID: {running_pid}") 33 return 34 else: 35 # If the existing PID doesn't exist, clear it from state. 36 teleport_state.set_pid(app_name, None) 37 teleport_state.set_address(app_name, None) 38 teleport_state.write() 39 40 # Otherwise spawn a Teleport proxy. 41 cmd_args = ["tsh", "proxy", "app", f"{app_name}", "--port", port] 42 child = subprocess.Popen( 43 cmd_args, 44 stdout=subprocess.DEVNULL, 45 stderr=subprocess.DEVNULL, 46 preexec_fn=os.setpgrp, 47 ) 48 ui.say(f"starting Teleport proxy for '{app_name}'...") 49 50 def wait(child, teleport_state, address): 51 wait_start = time.time() 52 53 while time.time() - wait_start < 2: 54 child_terminated = child.poll() 55 if child_terminated: 56 other_tshs = [ 57 p.pid 58 for p in psutil.process_iter(["pid", "name"]) 59 if p.name() == "tsh" 60 ] 61 ui.warn( 62 dedent( 63 f""" 64 Teleport proxy failed to start, 'tsh' process already running! 65 existing 'tsh' processes: {other_tshs} 66 exit code: {child_terminated} 67 """ 68 ) 69 ) 70 break 71 72 # Timed out! Check if the process is running. 73 child_pid_status = psutil.pid_exists(child.pid) 74 if child_pid_status: 75 # Record the PID, if the process started successfully. 76 teleport_state.set_pid(app_name, child.pid) 77 teleport_state.set_address(app_name, address) 78 teleport_state.write() 79 80 # Spawn a thread that will wait for the Teleport proxy to start, and 81 # record it's PID, or warn that it failed to start. 82 address = f"http://localhost:{port}" 83 thread = threading.Thread(target=wait, args=[child, teleport_state, address]) 84 thread.start()
Spawn a Teleport proxy for the provided app_name.
@classmethod
def
check(cls, app_name: str) -> str | None:
86 @classmethod 87 def check(cls, app_name: str) -> str | None: 88 """Check if a Teleport proxy is already running for the specified app_name.""" 89 90 teleport_state = build_config.TeleportLocalState.read() 91 existing_pid = teleport_state.get_pid(app_name) 92 93 if existing_pid and psutil.pid_exists(int(existing_pid)): 94 return teleport_state.get_pid(app_name) 95 else: 96 return None
Check if a Teleport proxy is already running for the specified app_name.