misc.python.materialize.spawn

Utilities for spawning processes.

The functions in this module are a convenient high-level interface to the operations provided by the standard subprocess module.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""Utilities for spawning processes.
 11
 12The functions in this module are a convenient high-level interface to the
 13operations provided by the standard [`subprocess`][subprocess] module.
 14
 15[subprocess]: https://docs.python.org/3/library/subprocess.html
 16"""
 17
 18import math
 19import subprocess
 20import sys
 21import time
 22from collections.abc import Callable, Sequence
 23from pathlib import Path
 24from typing import IO, TypeVar
 25
 26from materialize import ui
 27
 28CalledProcessError = subprocess.CalledProcessError
 29
 30
 31# NOTE(benesch): Please think twice before adding additional parameters to this
 32# method! It is meant to serve 95% of callers with a small ands understandable
 33# set of parameters. If your needs are niche, consider calling `subprocess.run`
 34# directly rather than adding a one-off parameter here.
 35def runv(
 36    args: Sequence[Path | str],
 37    *,
 38    cwd: Path | None = None,
 39    env: dict[str, str] | None = None,
 40    stdin: None | int | IO[bytes] | bytes = None,
 41    stdout: None | int | IO[bytes] = None,
 42    stderr: None | int | IO[bytes] = None,
 43) -> subprocess.CompletedProcess:
 44    """Verbosely run a subprocess.
 45
 46    A description of the subprocess will be written to stdout before the
 47    subprocess is executed.
 48
 49    Args:
 50        args: A list of strings or paths describing the program to run and
 51            the arguments to pass to it.
 52        cwd: An optional directory to change into before executing the process.
 53        env: A replacement environment with which to launch the process. If
 54            unspecified, the current process's environment is used. Replacement
 55            occurs wholesale, so use a construction like
 56            `env=dict(os.environ, KEY=VAL, ...)` to instead amend the existing
 57            environment.
 58        stdin: An optional IO handle or byte string to use as the process's
 59            stdin stream.
 60        stdout: An optional IO handle to use as the process's stdout stream.
 61        stderr: An optional IO handle to use as the process's stderr stream.
 62
 63    Raises:
 64        OSError: The process cannot be executed, e.g. because the specified
 65            program does not exist.
 66        CalledProcessError: The process exited with a non-zero exit status.
 67    """
 68    print("$", ui.shell_quote(args), file=sys.stderr)
 69
 70    input = None
 71    if isinstance(stdin, bytes):
 72        input = stdin
 73        stdin = None
 74
 75    return subprocess.run(
 76        args,
 77        cwd=cwd,
 78        env=env,
 79        input=input,
 80        stdin=stdin,
 81        stdout=stdout,
 82        stderr=stderr,
 83        check=True,
 84    )
 85
 86
 87def capture(
 88    args: Sequence[Path | str],
 89    *,
 90    cwd: Path | None = None,
 91    env: dict[str, str] | None = None,
 92    stdin: None | int | IO[bytes] | str = None,
 93    stderr: None | int | IO[bytes] = None,
 94) -> str:
 95    """Capture the output of a subprocess.
 96
 97    Args:
 98        args: A list of strings or paths describing the program to run and
 99            the arguments to pass to it.
100        cwd: An optional directory to change into before executing the process.
101        env: A replacement environment with which to launch the process. If
102            unspecified, the current process's environment is used. Replacement
103            occurs wholesale, so use a construction like
104            `env=dict(os.environ, KEY=VAL, ...)` to instead amend the existing
105            environment.
106        stdin: An optional IO handle, byte string or string to use as the process's
107            stdin stream.
108        stderr: An optional IO handle to use as the process's stderr stream.
109
110    Returns:
111        output: The verbatim output of the process as a string. Note that
112            trailing whitespace is preserved.
113
114    Raises:
115        OSError: The process cannot be executed, e.g. because the specified
116            program does not exist.
117        CalledProcessError: The process exited with a non-zero exit status.
118
119    .. tip:: Many programs produce output with a trailing newline.
120        You may want to call `strip()` on the output to remove any trailing
121        whitespace.
122    """
123    input = None
124    if isinstance(stdin, str):
125        input = stdin
126        stdin = None
127
128    return subprocess.check_output(
129        args, cwd=cwd, env=env, input=input, stdin=stdin, stderr=stderr, text=True
130    )
131
132
133def run_and_get_return_code(
134    args: Sequence[Path | str],
135    *,
136    cwd: Path | None = None,
137    env: dict[str, str] | None = None,
138) -> int:
139    """Run a subprocess and return the return code."""
140    try:
141        capture(args, cwd=cwd, env=env, stderr=subprocess.DEVNULL)
142        return 0
143    except CalledProcessError as e:
144        return e.returncode
145
146
147T = TypeVar("T")  # Generic type variable
148
149
150def run_with_retries(fn: Callable[[], T], max_duration: int = 60) -> T:
151    """Retry a function until it doesn't raise a `CalledProcessError`, uses
152    exponential backoff until `max_duration` is reached."""
153    for retry in range(math.ceil(math.log2(max_duration))):
154        try:
155            return fn()
156        except subprocess.CalledProcessError as e:
157            sleep_time = 2**retry
158            print(f"Failed: {e}, retrying in {sleep_time}s")
159            time.sleep(sleep_time)
160    return fn()
class CalledProcessError(subprocess.SubprocessError):
127class CalledProcessError(SubprocessError):
128    """Raised when run() is called with check=True and the process
129    returns a non-zero exit status.
130
131    Attributes:
132      cmd, returncode, stdout, stderr, output
133    """
134    def __init__(self, returncode, cmd, output=None, stderr=None):
135        self.returncode = returncode
136        self.cmd = cmd
137        self.output = output
138        self.stderr = stderr
139
140    def __str__(self):
141        if self.returncode and self.returncode < 0:
142            try:
143                return "Command '%s' died with %r." % (
144                        self.cmd, signal.Signals(-self.returncode))
145            except ValueError:
146                return "Command '%s' died with unknown signal %d." % (
147                        self.cmd, -self.returncode)
148        else:
149            return "Command '%s' returned non-zero exit status %d." % (
150                    self.cmd, self.returncode)
151
152    @property
153    def stdout(self):
154        """Alias for output attribute, to match stderr"""
155        return self.output
156
157    @stdout.setter
158    def stdout(self, value):
159        # There's no obvious reason to set this, but allow it anyway so
160        # .stdout is a transparent alias for .output
161        self.output = value

Raised when run() is called with check=True and the process returns a non-zero exit status.

Attributes: cmd, returncode, stdout, stderr, output

CalledProcessError(returncode, cmd, output=None, stderr=None)
134    def __init__(self, returncode, cmd, output=None, stderr=None):
135        self.returncode = returncode
136        self.cmd = cmd
137        self.output = output
138        self.stderr = stderr
returncode
cmd
output
stderr
stdout
152    @property
153    def stdout(self):
154        """Alias for output attribute, to match stderr"""
155        return self.output

Alias for output attribute, to match stderr

def runv( args: Sequence[pathlib.Path | str], *, cwd: pathlib.Path | None = None, env: dict[str, str] | None = None, stdin: Union[NoneType, int, IO[bytes], bytes] = None, stdout: Union[NoneType, int, IO[bytes]] = None, stderr: Union[NoneType, int, IO[bytes]] = None) -> subprocess.CompletedProcess:
36def runv(
37    args: Sequence[Path | str],
38    *,
39    cwd: Path | None = None,
40    env: dict[str, str] | None = None,
41    stdin: None | int | IO[bytes] | bytes = None,
42    stdout: None | int | IO[bytes] = None,
43    stderr: None | int | IO[bytes] = None,
44) -> subprocess.CompletedProcess:
45    """Verbosely run a subprocess.
46
47    A description of the subprocess will be written to stdout before the
48    subprocess is executed.
49
50    Args:
51        args: A list of strings or paths describing the program to run and
52            the arguments to pass to it.
53        cwd: An optional directory to change into before executing the process.
54        env: A replacement environment with which to launch the process. If
55            unspecified, the current process's environment is used. Replacement
56            occurs wholesale, so use a construction like
57            `env=dict(os.environ, KEY=VAL, ...)` to instead amend the existing
58            environment.
59        stdin: An optional IO handle or byte string to use as the process's
60            stdin stream.
61        stdout: An optional IO handle to use as the process's stdout stream.
62        stderr: An optional IO handle to use as the process's stderr stream.
63
64    Raises:
65        OSError: The process cannot be executed, e.g. because the specified
66            program does not exist.
67        CalledProcessError: The process exited with a non-zero exit status.
68    """
69    print("$", ui.shell_quote(args), file=sys.stderr)
70
71    input = None
72    if isinstance(stdin, bytes):
73        input = stdin
74        stdin = None
75
76    return subprocess.run(
77        args,
78        cwd=cwd,
79        env=env,
80        input=input,
81        stdin=stdin,
82        stdout=stdout,
83        stderr=stderr,
84        check=True,
85    )

Verbosely run a subprocess.

A description of the subprocess will be written to stdout before the subprocess is executed.

Args: args: A list of strings or paths describing the program to run and the arguments to pass to it. cwd: An optional directory to change into before executing the process. env: A replacement environment with which to launch the process. If unspecified, the current process's environment is used. Replacement occurs wholesale, so use a construction like env=dict(os.environ, KEY=VAL, ...) to instead amend the existing environment. stdin: An optional IO handle or byte string to use as the process's stdin stream. stdout: An optional IO handle to use as the process's stdout stream. stderr: An optional IO handle to use as the process's stderr stream.

Raises: OSError: The process cannot be executed, e.g. because the specified program does not exist. CalledProcessError: The process exited with a non-zero exit status.

def capture( args: Sequence[pathlib.Path | str], *, cwd: pathlib.Path | None = None, env: dict[str, str] | None = None, stdin: Union[NoneType, int, IO[bytes], str] = None, stderr: Union[NoneType, int, IO[bytes]] = None) -> str:
 88def capture(
 89    args: Sequence[Path | str],
 90    *,
 91    cwd: Path | None = None,
 92    env: dict[str, str] | None = None,
 93    stdin: None | int | IO[bytes] | str = None,
 94    stderr: None | int | IO[bytes] = None,
 95) -> str:
 96    """Capture the output of a subprocess.
 97
 98    Args:
 99        args: A list of strings or paths describing the program to run and
100            the arguments to pass to it.
101        cwd: An optional directory to change into before executing the process.
102        env: A replacement environment with which to launch the process. If
103            unspecified, the current process's environment is used. Replacement
104            occurs wholesale, so use a construction like
105            `env=dict(os.environ, KEY=VAL, ...)` to instead amend the existing
106            environment.
107        stdin: An optional IO handle, byte string or string to use as the process's
108            stdin stream.
109        stderr: An optional IO handle to use as the process's stderr stream.
110
111    Returns:
112        output: The verbatim output of the process as a string. Note that
113            trailing whitespace is preserved.
114
115    Raises:
116        OSError: The process cannot be executed, e.g. because the specified
117            program does not exist.
118        CalledProcessError: The process exited with a non-zero exit status.
119
120    .. tip:: Many programs produce output with a trailing newline.
121        You may want to call `strip()` on the output to remove any trailing
122        whitespace.
123    """
124    input = None
125    if isinstance(stdin, str):
126        input = stdin
127        stdin = None
128
129    return subprocess.check_output(
130        args, cwd=cwd, env=env, input=input, stdin=stdin, stderr=stderr, text=True
131    )

Capture the output of a subprocess.

Args: args: A list of strings or paths describing the program to run and the arguments to pass to it. cwd: An optional directory to change into before executing the process. env: A replacement environment with which to launch the process. If unspecified, the current process's environment is used. Replacement occurs wholesale, so use a construction like env=dict(os.environ, KEY=VAL, ...) to instead amend the existing environment. stdin: An optional IO handle, byte string or string to use as the process's stdin stream. stderr: An optional IO handle to use as the process's stderr stream.

Returns: output: The verbatim output of the process as a string. Note that trailing whitespace is preserved.

Raises: OSError: The process cannot be executed, e.g. because the specified program does not exist. CalledProcessError: The process exited with a non-zero exit status.

.. tip:: Many programs produce output with a trailing newline. You may want to call strip() on the output to remove any trailing whitespace.

def run_and_get_return_code( args: Sequence[pathlib.Path | str], *, cwd: pathlib.Path | None = None, env: dict[str, str] | None = None) -> int:
134def run_and_get_return_code(
135    args: Sequence[Path | str],
136    *,
137    cwd: Path | None = None,
138    env: dict[str, str] | None = None,
139) -> int:
140    """Run a subprocess and return the return code."""
141    try:
142        capture(args, cwd=cwd, env=env, stderr=subprocess.DEVNULL)
143        return 0
144    except CalledProcessError as e:
145        return e.returncode

Run a subprocess and return the return code.

def run_with_retries(fn: Callable[[], ~T], max_duration: int = 60) -> ~T:
151def run_with_retries(fn: Callable[[], T], max_duration: int = 60) -> T:
152    """Retry a function until it doesn't raise a `CalledProcessError`, uses
153    exponential backoff until `max_duration` is reached."""
154    for retry in range(math.ceil(math.log2(max_duration))):
155        try:
156            return fn()
157        except subprocess.CalledProcessError as e:
158            sleep_time = 2**retry
159            print(f"Failed: {e}, retrying in {sleep_time}s")
160            time.sleep(sleep_time)
161    return fn()

Retry a function until it doesn't raise a CalledProcessError, uses exponential backoff until max_duration is reached.