misc.python.materialize.teleport

 1# Copyright Materialize, Inc. and contributors. All rights reserved.
 2#
 3# Use of this software is governed by the Business Source License
 4# included in the LICENSE file at the root of this repository.
 5#
 6# As of the Change Date specified in that file, in accordance with
 7# the Business Source License, use of this software will be governed
 8# by the Apache License, Version 2.0.
 9
10import os
11import subprocess
12import threading
13import time
14from textwrap import dedent
15
16import psutil
17
18from materialize import build_config, ui
19
20
21class TeleportProxy:
22    @classmethod
23    def spawn(cls, app_name: str, port: str):
24        """Spawn a Teleport proxy for the provided app_name."""
25
26        teleport_state = build_config.TeleportLocalState.read()
27
28        # If there is already a Teleport proxy running, no need to restart one.
29        running_pid = TeleportProxy.check(app_name)
30        if running_pid:
31            ui.say(f"Teleport proxy already running, PID: {running_pid}")
32            return
33        else:
34            # If the existing PID doesn't exist, clear it from state.
35            teleport_state.set_pid(app_name, None)
36            teleport_state.set_address(app_name, None)
37            teleport_state.write()
38
39        # Otherwise spawn a Teleport proxy.
40        cmd_args = ["tsh", "proxy", "app", f"{app_name}", "--port", port]
41        child = subprocess.Popen(
42            cmd_args,
43            stdout=subprocess.DEVNULL,
44            stderr=subprocess.DEVNULL,
45            preexec_fn=os.setpgrp,
46        )
47        ui.say(f"starting Teleport proxy for '{app_name}'...")
48
49        def wait(child, teleport_state, address):
50            wait_start = time.time()
51
52            while time.time() - wait_start < 2:
53                child_terminated = child.poll()
54                if child_terminated:
55                    other_tshs = [
56                        p.pid
57                        for p in psutil.process_iter(["pid", "name"])
58                        if p.name() == "tsh"
59                    ]
60                    ui.warn(
61                        dedent(
62                            f"""
63                    Teleport proxy failed to start, 'tsh' process already running!
64                        existing 'tsh' processes: {other_tshs}
65                        exit code: {child_terminated}
66                    """
67                        )
68                    )
69                    break
70
71            # Timed out! Check if the process is running.
72            child_pid_status = psutil.pid_exists(child.pid)
73            if child_pid_status:
74                # Record the PID, if the process started successfully.
75                teleport_state.set_pid(app_name, child.pid)
76                teleport_state.set_address(app_name, address)
77                teleport_state.write()
78
79        # Spawn a thread that will wait for the Teleport proxy to start, and
80        # record it's PID, or warn that it failed to start.
81        address = f"http://localhost:{port}"
82        thread = threading.Thread(target=wait, args=[child, teleport_state, address])
83        thread.start()
84
85    @classmethod
86    def check(cls, app_name: str) -> str | None:
87        """Check if a Teleport proxy is already running for the specified app_name."""
88
89        teleport_state = build_config.TeleportLocalState.read()
90        existing_pid = teleport_state.get_pid(app_name)
91
92        if existing_pid and psutil.pid_exists(int(existing_pid)):
93            return teleport_state.get_pid(app_name)
94        else:
95            return None
class TeleportProxy:
22class TeleportProxy:
23    @classmethod
24    def spawn(cls, app_name: str, port: str):
25        """Spawn a Teleport proxy for the provided app_name."""
26
27        teleport_state = build_config.TeleportLocalState.read()
28
29        # If there is already a Teleport proxy running, no need to restart one.
30        running_pid = TeleportProxy.check(app_name)
31        if running_pid:
32            ui.say(f"Teleport proxy already running, PID: {running_pid}")
33            return
34        else:
35            # If the existing PID doesn't exist, clear it from state.
36            teleport_state.set_pid(app_name, None)
37            teleport_state.set_address(app_name, None)
38            teleport_state.write()
39
40        # Otherwise spawn a Teleport proxy.
41        cmd_args = ["tsh", "proxy", "app", f"{app_name}", "--port", port]
42        child = subprocess.Popen(
43            cmd_args,
44            stdout=subprocess.DEVNULL,
45            stderr=subprocess.DEVNULL,
46            preexec_fn=os.setpgrp,
47        )
48        ui.say(f"starting Teleport proxy for '{app_name}'...")
49
50        def wait(child, teleport_state, address):
51            wait_start = time.time()
52
53            while time.time() - wait_start < 2:
54                child_terminated = child.poll()
55                if child_terminated:
56                    other_tshs = [
57                        p.pid
58                        for p in psutil.process_iter(["pid", "name"])
59                        if p.name() == "tsh"
60                    ]
61                    ui.warn(
62                        dedent(
63                            f"""
64                    Teleport proxy failed to start, 'tsh' process already running!
65                        existing 'tsh' processes: {other_tshs}
66                        exit code: {child_terminated}
67                    """
68                        )
69                    )
70                    break
71
72            # Timed out! Check if the process is running.
73            child_pid_status = psutil.pid_exists(child.pid)
74            if child_pid_status:
75                # Record the PID, if the process started successfully.
76                teleport_state.set_pid(app_name, child.pid)
77                teleport_state.set_address(app_name, address)
78                teleport_state.write()
79
80        # Spawn a thread that will wait for the Teleport proxy to start, and
81        # record it's PID, or warn that it failed to start.
82        address = f"http://localhost:{port}"
83        thread = threading.Thread(target=wait, args=[child, teleport_state, address])
84        thread.start()
85
86    @classmethod
87    def check(cls, app_name: str) -> str | None:
88        """Check if a Teleport proxy is already running for the specified app_name."""
89
90        teleport_state = build_config.TeleportLocalState.read()
91        existing_pid = teleport_state.get_pid(app_name)
92
93        if existing_pid and psutil.pid_exists(int(existing_pid)):
94            return teleport_state.get_pid(app_name)
95        else:
96            return None
@classmethod
def spawn(cls, app_name: str, port: str):
23    @classmethod
24    def spawn(cls, app_name: str, port: str):
25        """Spawn a Teleport proxy for the provided app_name."""
26
27        teleport_state = build_config.TeleportLocalState.read()
28
29        # If there is already a Teleport proxy running, no need to restart one.
30        running_pid = TeleportProxy.check(app_name)
31        if running_pid:
32            ui.say(f"Teleport proxy already running, PID: {running_pid}")
33            return
34        else:
35            # If the existing PID doesn't exist, clear it from state.
36            teleport_state.set_pid(app_name, None)
37            teleport_state.set_address(app_name, None)
38            teleport_state.write()
39
40        # Otherwise spawn a Teleport proxy.
41        cmd_args = ["tsh", "proxy", "app", f"{app_name}", "--port", port]
42        child = subprocess.Popen(
43            cmd_args,
44            stdout=subprocess.DEVNULL,
45            stderr=subprocess.DEVNULL,
46            preexec_fn=os.setpgrp,
47        )
48        ui.say(f"starting Teleport proxy for '{app_name}'...")
49
50        def wait(child, teleport_state, address):
51            wait_start = time.time()
52
53            while time.time() - wait_start < 2:
54                child_terminated = child.poll()
55                if child_terminated:
56                    other_tshs = [
57                        p.pid
58                        for p in psutil.process_iter(["pid", "name"])
59                        if p.name() == "tsh"
60                    ]
61                    ui.warn(
62                        dedent(
63                            f"""
64                    Teleport proxy failed to start, 'tsh' process already running!
65                        existing 'tsh' processes: {other_tshs}
66                        exit code: {child_terminated}
67                    """
68                        )
69                    )
70                    break
71
72            # Timed out! Check if the process is running.
73            child_pid_status = psutil.pid_exists(child.pid)
74            if child_pid_status:
75                # Record the PID, if the process started successfully.
76                teleport_state.set_pid(app_name, child.pid)
77                teleport_state.set_address(app_name, address)
78                teleport_state.write()
79
80        # Spawn a thread that will wait for the Teleport proxy to start, and
81        # record it's PID, or warn that it failed to start.
82        address = f"http://localhost:{port}"
83        thread = threading.Thread(target=wait, args=[child, teleport_state, address])
84        thread.start()

Spawn a Teleport proxy for the provided app_name.

@classmethod
def check(cls, app_name: str) -> str | None:
86    @classmethod
87    def check(cls, app_name: str) -> str | None:
88        """Check if a Teleport proxy is already running for the specified app_name."""
89
90        teleport_state = build_config.TeleportLocalState.read()
91        existing_pid = teleport_state.get_pid(app_name)
92
93        if existing_pid and psutil.pid_exists(int(existing_pid)):
94            return teleport_state.get_pid(app_name)
95        else:
96            return None

Check if a Teleport proxy is already running for the specified app_name.