misc.python.materialize.spawn
Utilities for spawning processes.
The functions in this module are a convenient high-level interface to the
operations provided by the standard subprocess
module.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Utilities for spawning processes. 11 12The functions in this module are a convenient high-level interface to the 13operations provided by the standard [`subprocess`][subprocess] module. 14 15[subprocess]: https://docs.python.org/3/library/subprocess.html 16""" 17 18import math 19import subprocess 20import sys 21import time 22from collections.abc import Callable, Sequence 23from pathlib import Path 24from typing import IO, TypeVar 25 26from materialize import ui 27 28CalledProcessError = subprocess.CalledProcessError 29 30 31# NOTE(benesch): Please think twice before adding additional parameters to this 32# method! It is meant to serve 95% of callers with a small ands understandable 33# set of parameters. If your needs are niche, consider calling `subprocess.run` 34# directly rather than adding a one-off parameter here. 35def runv( 36 args: Sequence[Path | str], 37 *, 38 cwd: Path | None = None, 39 env: dict[str, str] | None = None, 40 stdin: None | int | IO[bytes] | bytes = None, 41 stdout: None | int | IO[bytes] = None, 42 stderr: None | int | IO[bytes] = None, 43) -> subprocess.CompletedProcess: 44 """Verbosely run a subprocess. 45 46 A description of the subprocess will be written to stdout before the 47 subprocess is executed. 48 49 Args: 50 args: A list of strings or paths describing the program to run and 51 the arguments to pass to it. 52 cwd: An optional directory to change into before executing the process. 53 env: A replacement environment with which to launch the process. If 54 unspecified, the current process's environment is used. Replacement 55 occurs wholesale, so use a construction like 56 `env=dict(os.environ, KEY=VAL, ...)` to instead amend the existing 57 environment. 58 stdin: An optional IO handle or byte string to use as the process's 59 stdin stream. 60 stdout: An optional IO handle to use as the process's stdout stream. 61 stderr: An optional IO handle to use as the process's stderr stream. 62 63 Raises: 64 OSError: The process cannot be executed, e.g. because the specified 65 program does not exist. 66 CalledProcessError: The process exited with a non-zero exit status. 67 """ 68 print("$", ui.shell_quote(args), file=sys.stderr) 69 70 input = None 71 if isinstance(stdin, bytes): 72 input = stdin 73 stdin = None 74 75 return subprocess.run( 76 args, 77 cwd=cwd, 78 env=env, 79 input=input, 80 stdin=stdin, 81 stdout=stdout, 82 stderr=stderr, 83 check=True, 84 ) 85 86 87def capture( 88 args: Sequence[Path | str], 89 *, 90 cwd: Path | None = None, 91 env: dict[str, str] | None = None, 92 stdin: None | int | IO[bytes] | str = None, 93 stderr: None | int | IO[bytes] = None, 94) -> str: 95 """Capture the output of a subprocess. 96 97 Args: 98 args: A list of strings or paths describing the program to run and 99 the arguments to pass to it. 100 cwd: An optional directory to change into before executing the process. 101 env: A replacement environment with which to launch the process. If 102 unspecified, the current process's environment is used. Replacement 103 occurs wholesale, so use a construction like 104 `env=dict(os.environ, KEY=VAL, ...)` to instead amend the existing 105 environment. 106 stdin: An optional IO handle, byte string or string to use as the process's 107 stdin stream. 108 stderr: An optional IO handle to use as the process's stderr stream. 109 110 Returns: 111 output: The verbatim output of the process as a string. Note that 112 trailing whitespace is preserved. 113 114 Raises: 115 OSError: The process cannot be executed, e.g. because the specified 116 program does not exist. 117 CalledProcessError: The process exited with a non-zero exit status. 118 119 .. tip:: Many programs produce output with a trailing newline. 120 You may want to call `strip()` on the output to remove any trailing 121 whitespace. 122 """ 123 input = None 124 if isinstance(stdin, str): 125 input = stdin 126 stdin = None 127 128 return subprocess.check_output( 129 args, cwd=cwd, env=env, input=input, stdin=stdin, stderr=stderr, text=True 130 ) 131 132 133def run_and_get_return_code( 134 args: Sequence[Path | str], 135 *, 136 cwd: Path | None = None, 137 env: dict[str, str] | None = None, 138) -> int: 139 """Run a subprocess and return the return code.""" 140 try: 141 capture(args, cwd=cwd, env=env, stderr=subprocess.DEVNULL) 142 return 0 143 except CalledProcessError as e: 144 return e.returncode 145 146 147T = TypeVar("T") # Generic type variable 148 149 150def run_with_retries(fn: Callable[[], T], max_duration: int = 60) -> T: 151 """Retry a function until it doesn't raise a `CalledProcessError`, uses 152 exponential backoff until `max_duration` is reached.""" 153 for retry in range(math.ceil(math.log2(max_duration))): 154 try: 155 return fn() 156 except subprocess.CalledProcessError as e: 157 sleep_time = 2**retry 158 print(f"Failed: {e}, retrying in {sleep_time}s") 159 time.sleep(sleep_time) 160 return fn()
127class CalledProcessError(SubprocessError): 128 """Raised when run() is called with check=True and the process 129 returns a non-zero exit status. 130 131 Attributes: 132 cmd, returncode, stdout, stderr, output 133 """ 134 def __init__(self, returncode, cmd, output=None, stderr=None): 135 self.returncode = returncode 136 self.cmd = cmd 137 self.output = output 138 self.stderr = stderr 139 140 def __str__(self): 141 if self.returncode and self.returncode < 0: 142 try: 143 return "Command '%s' died with %r." % ( 144 self.cmd, signal.Signals(-self.returncode)) 145 except ValueError: 146 return "Command '%s' died with unknown signal %d." % ( 147 self.cmd, -self.returncode) 148 else: 149 return "Command '%s' returned non-zero exit status %d." % ( 150 self.cmd, self.returncode) 151 152 @property 153 def stdout(self): 154 """Alias for output attribute, to match stderr""" 155 return self.output 156 157 @stdout.setter 158 def stdout(self, value): 159 # There's no obvious reason to set this, but allow it anyway so 160 # .stdout is a transparent alias for .output 161 self.output = value
Raised when run() is called with check=True and the process returns a non-zero exit status.
Attributes: cmd, returncode, stdout, stderr, output
36def runv( 37 args: Sequence[Path | str], 38 *, 39 cwd: Path | None = None, 40 env: dict[str, str] | None = None, 41 stdin: None | int | IO[bytes] | bytes = None, 42 stdout: None | int | IO[bytes] = None, 43 stderr: None | int | IO[bytes] = None, 44) -> subprocess.CompletedProcess: 45 """Verbosely run a subprocess. 46 47 A description of the subprocess will be written to stdout before the 48 subprocess is executed. 49 50 Args: 51 args: A list of strings or paths describing the program to run and 52 the arguments to pass to it. 53 cwd: An optional directory to change into before executing the process. 54 env: A replacement environment with which to launch the process. If 55 unspecified, the current process's environment is used. Replacement 56 occurs wholesale, so use a construction like 57 `env=dict(os.environ, KEY=VAL, ...)` to instead amend the existing 58 environment. 59 stdin: An optional IO handle or byte string to use as the process's 60 stdin stream. 61 stdout: An optional IO handle to use as the process's stdout stream. 62 stderr: An optional IO handle to use as the process's stderr stream. 63 64 Raises: 65 OSError: The process cannot be executed, e.g. because the specified 66 program does not exist. 67 CalledProcessError: The process exited with a non-zero exit status. 68 """ 69 print("$", ui.shell_quote(args), file=sys.stderr) 70 71 input = None 72 if isinstance(stdin, bytes): 73 input = stdin 74 stdin = None 75 76 return subprocess.run( 77 args, 78 cwd=cwd, 79 env=env, 80 input=input, 81 stdin=stdin, 82 stdout=stdout, 83 stderr=stderr, 84 check=True, 85 )
Verbosely run a subprocess.
A description of the subprocess will be written to stdout before the subprocess is executed.
Args:
args: A list of strings or paths describing the program to run and
the arguments to pass to it.
cwd: An optional directory to change into before executing the process.
env: A replacement environment with which to launch the process. If
unspecified, the current process's environment is used. Replacement
occurs wholesale, so use a construction like
env=dict(os.environ, KEY=VAL, ...)
to instead amend the existing
environment.
stdin: An optional IO handle or byte string to use as the process's
stdin stream.
stdout: An optional IO handle to use as the process's stdout stream.
stderr: An optional IO handle to use as the process's stderr stream.
Raises: OSError: The process cannot be executed, e.g. because the specified program does not exist. CalledProcessError: The process exited with a non-zero exit status.
88def capture( 89 args: Sequence[Path | str], 90 *, 91 cwd: Path | None = None, 92 env: dict[str, str] | None = None, 93 stdin: None | int | IO[bytes] | str = None, 94 stderr: None | int | IO[bytes] = None, 95) -> str: 96 """Capture the output of a subprocess. 97 98 Args: 99 args: A list of strings or paths describing the program to run and 100 the arguments to pass to it. 101 cwd: An optional directory to change into before executing the process. 102 env: A replacement environment with which to launch the process. If 103 unspecified, the current process's environment is used. Replacement 104 occurs wholesale, so use a construction like 105 `env=dict(os.environ, KEY=VAL, ...)` to instead amend the existing 106 environment. 107 stdin: An optional IO handle, byte string or string to use as the process's 108 stdin stream. 109 stderr: An optional IO handle to use as the process's stderr stream. 110 111 Returns: 112 output: The verbatim output of the process as a string. Note that 113 trailing whitespace is preserved. 114 115 Raises: 116 OSError: The process cannot be executed, e.g. because the specified 117 program does not exist. 118 CalledProcessError: The process exited with a non-zero exit status. 119 120 .. tip:: Many programs produce output with a trailing newline. 121 You may want to call `strip()` on the output to remove any trailing 122 whitespace. 123 """ 124 input = None 125 if isinstance(stdin, str): 126 input = stdin 127 stdin = None 128 129 return subprocess.check_output( 130 args, cwd=cwd, env=env, input=input, stdin=stdin, stderr=stderr, text=True 131 )
Capture the output of a subprocess.
Args:
args: A list of strings or paths describing the program to run and
the arguments to pass to it.
cwd: An optional directory to change into before executing the process.
env: A replacement environment with which to launch the process. If
unspecified, the current process's environment is used. Replacement
occurs wholesale, so use a construction like
env=dict(os.environ, KEY=VAL, ...)
to instead amend the existing
environment.
stdin: An optional IO handle, byte string or string to use as the process's
stdin stream.
stderr: An optional IO handle to use as the process's stderr stream.
Returns: output: The verbatim output of the process as a string. Note that trailing whitespace is preserved.
Raises: OSError: The process cannot be executed, e.g. because the specified program does not exist. CalledProcessError: The process exited with a non-zero exit status.
.. tip:: Many programs produce output with a trailing newline.
You may want to call strip()
on the output to remove any trailing
whitespace.
134def run_and_get_return_code( 135 args: Sequence[Path | str], 136 *, 137 cwd: Path | None = None, 138 env: dict[str, str] | None = None, 139) -> int: 140 """Run a subprocess and return the return code.""" 141 try: 142 capture(args, cwd=cwd, env=env, stderr=subprocess.DEVNULL) 143 return 0 144 except CalledProcessError as e: 145 return e.returncode
Run a subprocess and return the return code.
151def run_with_retries(fn: Callable[[], T], max_duration: int = 60) -> T: 152 """Retry a function until it doesn't raise a `CalledProcessError`, uses 153 exponential backoff until `max_duration` is reached.""" 154 for retry in range(math.ceil(math.log2(max_duration))): 155 try: 156 return fn() 157 except subprocess.CalledProcessError as e: 158 sleep_time = 2**retry 159 print(f"Failed: {e}, retrying in {sleep_time}s") 160 time.sleep(sleep_time) 161 return fn()
Retry a function until it doesn't raise a CalledProcessError
, uses
exponential backoff until max_duration
is reached.