misc.python.materialize.parallel_task

Run shell commands in parallel with timing and status reporting.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""Run shell commands in parallel with timing and status reporting."""
 11
 12from __future__ import annotations
 13
 14import queue
 15import subprocess
 16import sys
 17import threading
 18import time
 19from collections.abc import Callable
 20from datetime import datetime, timedelta
 21
 22from materialize import buildkite
 23from materialize.terminal import (
 24    COLOR_ERROR,
 25    COLOR_OK,
 26    STYLE_BOLD,
 27    with_formatting,
 28    with_formattings,
 29)
 30
 31TaskSpec = list[str] | Callable[[], tuple[bool, str]]
 32
 33OK = with_formatting("✓", COLOR_OK)
 34FAIL = with_formattings("✗", [COLOR_ERROR, STYLE_BOLD])
 35
 36
 37def _prefix(ci: str = "---") -> str:
 38    return ci + " " if buildkite.is_in_buildkite() else ""
 39
 40
 41class TaskThread(threading.Thread):
 42    """Runs a shell command or callable in a thread, capturing output, duration, and exit status."""
 43
 44    def __init__(
 45        self,
 46        name: str,
 47        spec: TaskSpec | None = None,
 48        command: list[str] | None = None,
 49        done_queue: queue.Queue[TaskThread] | None = None,
 50    ):
 51        super().__init__()
 52        self.name = name
 53        self.output: str = ""
 54        self.success = False
 55        self.duration: timedelta = timedelta()
 56        self._done_queue = done_queue
 57        resolved = command if spec is None else spec
 58        assert resolved is not None, "must provide spec or command"
 59        if callable(resolved):
 60            self._fn: Callable[[], tuple[bool, str]] | None = resolved
 61            self._command: list[str] | None = None
 62        else:
 63            self._fn = None
 64            self._command = resolved
 65
 66    def run(self) -> None:
 67        start = datetime.now()
 68        try:
 69            if self._fn is not None:
 70                self.success, self.output = self._fn()
 71            else:
 72                assert self._command is not None
 73                proc = subprocess.Popen(
 74                    self._command,
 75                    stdout=subprocess.PIPE,
 76                    stderr=subprocess.STDOUT,
 77                )
 78                stdout, _ = proc.communicate()
 79                self.success = proc.returncode == 0
 80                self.output = stdout.decode("utf-8").strip()
 81        except Exception as e:
 82            self.output = str(e)
 83            self.success = False
 84        self.duration = datetime.now() - start
 85        if self._done_queue is not None:
 86            self._done_queue.put(self)
 87
 88
 89class _SpinnerThread(threading.Thread):
 90    def __init__(self, remaining: int, suffix: str = "tasks") -> None:
 91        super().__init__(daemon=True)
 92        self._remaining = remaining
 93        self._suffix = suffix
 94        # "checks in foo" -> "check in foo"
 95        parts = suffix.split(" ", 1)
 96        self._singular = parts[0].rstrip("s") + (
 97            " " + parts[1] if len(parts) > 1 else ""
 98        )
 99        self._lock = threading.Lock()
100        self.active = not buildkite.is_in_buildkite() and sys.stdout.isatty()
101
102    def run(self) -> None:
103        symbols = ["⣾", "⣷", "⣯", "⣟", "⡿", "⢿", "⣻", "⣽"]
104        i = 0
105        while self.active:
106            with self._lock:
107                remaining = self._remaining
108            suffix = self._suffix if remaining != 1 else self._singular
109            print(
110                f"\r\033[K{symbols[i]} {remaining} {suffix}",
111                end="",
112                flush=True,
113            )
114            i = (i + 1) % len(symbols)
115            time.sleep(0.1)
116
117    def task_done(self) -> None:
118        with self._lock:
119            self._remaining -= 1
120
121    def clear_line(self) -> None:
122        if self.active:
123            print("\r\033[K", end="", flush=True)
124
125    def stop(self) -> None:
126        if self.active:
127            self.active = False
128            print("\r\033[K", end="", flush=True)
129
130
131def run_parallel(
132    tasks: list[tuple[str, TaskSpec]],
133    verbose: bool = False,
134    print_duration: bool = True,
135    spinner_suffix: str = "tasks",
136    print_summary: bool = True,
137) -> list[str]:
138    """Run tasks in parallel and print results as each task finishes.
139
140    Args:
141        tasks: List of (name, spec) pairs.
142        verbose: If True, print output even for successful tasks.
143        print_duration: If True, include duration in status lines.
144        spinner_suffix: Suffix after the count in the spinner, e.g. "tasks".
145        print_summary: If True, print a final success/failure summary.
146
147    Returns:
148        List of failed task names (empty on full success).
149    """
150    done_q: queue.Queue[TaskThread] = queue.Queue()
151    threads = [TaskThread(name, spec, done_queue=done_q) for name, spec in tasks]
152
153    spinner = _SpinnerThread(len(threads), spinner_suffix)
154    spinner.start()
155
156    for t in threads:
157        t.start()
158
159    failed = []
160    for _ in threads:
161        t = done_q.get()
162        spinner.task_done()
163        spinner.clear_line()
164        formatted_duration = (
165            f" [{t.duration.total_seconds():5.2f}s]" if print_duration else ""
166        )
167        if t.success:
168            print(f"{_prefix('---')}{OK}{formatted_duration} {t.name}")
169        else:
170            print(f"{_prefix('+++')}{FAIL}{formatted_duration} {t.name}")
171            failed.append(t.name)
172        if t.output and (not t.success or verbose):
173            print(t.output)
174
175    spinner.stop()
176
177    if print_summary:
178        if failed:
179            print(f"{_prefix('+++')}{FAIL} Failed: {failed}")
180        else:
181            print(f"{_prefix('+++')}{OK} All tasks successful")
182
183    return failed
TaskSpec = list[str] | collections.abc.Callable[[], tuple[bool, str]]
OK = '\x1b[92m✓\x1b[0m'
FAIL = '\x1b[91m\x1b[1m✗\x1b[0m'
class TaskThread(threading.Thread):
42class TaskThread(threading.Thread):
43    """Runs a shell command or callable in a thread, capturing output, duration, and exit status."""
44
45    def __init__(
46        self,
47        name: str,
48        spec: TaskSpec | None = None,
49        command: list[str] | None = None,
50        done_queue: queue.Queue[TaskThread] | None = None,
51    ):
52        super().__init__()
53        self.name = name
54        self.output: str = ""
55        self.success = False
56        self.duration: timedelta = timedelta()
57        self._done_queue = done_queue
58        resolved = command if spec is None else spec
59        assert resolved is not None, "must provide spec or command"
60        if callable(resolved):
61            self._fn: Callable[[], tuple[bool, str]] | None = resolved
62            self._command: list[str] | None = None
63        else:
64            self._fn = None
65            self._command = resolved
66
67    def run(self) -> None:
68        start = datetime.now()
69        try:
70            if self._fn is not None:
71                self.success, self.output = self._fn()
72            else:
73                assert self._command is not None
74                proc = subprocess.Popen(
75                    self._command,
76                    stdout=subprocess.PIPE,
77                    stderr=subprocess.STDOUT,
78                )
79                stdout, _ = proc.communicate()
80                self.success = proc.returncode == 0
81                self.output = stdout.decode("utf-8").strip()
82        except Exception as e:
83            self.output = str(e)
84            self.success = False
85        self.duration = datetime.now() - start
86        if self._done_queue is not None:
87            self._done_queue.put(self)

Runs a shell command or callable in a thread, capturing output, duration, and exit status.

TaskThread( name: str, spec: list[str] | Callable[[], tuple[bool, str]] | None = None, command: list[str] | None = None, done_queue: queue.Queue[TaskThread] | None = None)
45    def __init__(
46        self,
47        name: str,
48        spec: TaskSpec | None = None,
49        command: list[str] | None = None,
50        done_queue: queue.Queue[TaskThread] | None = None,
51    ):
52        super().__init__()
53        self.name = name
54        self.output: str = ""
55        self.success = False
56        self.duration: timedelta = timedelta()
57        self._done_queue = done_queue
58        resolved = command if spec is None else spec
59        assert resolved is not None, "must provide spec or command"
60        if callable(resolved):
61            self._fn: Callable[[], tuple[bool, str]] | None = resolved
62            self._command: list[str] | None = None
63        else:
64            self._fn = None
65            self._command = resolved

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

name
1181    @property
1182    def name(self):
1183        """A string used for identification purposes only.
1184
1185        It has no semantics. Multiple threads may be given the same name. The
1186        initial name is set by the constructor.
1187
1188        """
1189        assert self._initialized, "Thread.__init__() not called"
1190        return self._name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

output: str
success
duration: datetime.timedelta
def run(self) -> None:
67    def run(self) -> None:
68        start = datetime.now()
69        try:
70            if self._fn is not None:
71                self.success, self.output = self._fn()
72            else:
73                assert self._command is not None
74                proc = subprocess.Popen(
75                    self._command,
76                    stdout=subprocess.PIPE,
77                    stderr=subprocess.STDOUT,
78                )
79                stdout, _ = proc.communicate()
80                self.success = proc.returncode == 0
81                self.output = stdout.decode("utf-8").strip()
82        except Exception as e:
83            self.output = str(e)
84            self.success = False
85        self.duration = datetime.now() - start
86        if self._done_queue is not None:
87            self._done_queue.put(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

def run_parallel( tasks: list[tuple[str, list[str] | Callable[[], tuple[bool, str]]]], verbose: bool = False, print_duration: bool = True, spinner_suffix: str = 'tasks', print_summary: bool = True) -> list[str]:
132def run_parallel(
133    tasks: list[tuple[str, TaskSpec]],
134    verbose: bool = False,
135    print_duration: bool = True,
136    spinner_suffix: str = "tasks",
137    print_summary: bool = True,
138) -> list[str]:
139    """Run tasks in parallel and print results as each task finishes.
140
141    Args:
142        tasks: List of (name, spec) pairs.
143        verbose: If True, print output even for successful tasks.
144        print_duration: If True, include duration in status lines.
145        spinner_suffix: Suffix after the count in the spinner, e.g. "tasks".
146        print_summary: If True, print a final success/failure summary.
147
148    Returns:
149        List of failed task names (empty on full success).
150    """
151    done_q: queue.Queue[TaskThread] = queue.Queue()
152    threads = [TaskThread(name, spec, done_queue=done_q) for name, spec in tasks]
153
154    spinner = _SpinnerThread(len(threads), spinner_suffix)
155    spinner.start()
156
157    for t in threads:
158        t.start()
159
160    failed = []
161    for _ in threads:
162        t = done_q.get()
163        spinner.task_done()
164        spinner.clear_line()
165        formatted_duration = (
166            f" [{t.duration.total_seconds():5.2f}s]" if print_duration else ""
167        )
168        if t.success:
169            print(f"{_prefix('---')}{OK}{formatted_duration} {t.name}")
170        else:
171            print(f"{_prefix('+++')}{FAIL}{formatted_duration} {t.name}")
172            failed.append(t.name)
173        if t.output and (not t.success or verbose):
174            print(t.output)
175
176    spinner.stop()
177
178    if print_summary:
179        if failed:
180            print(f"{_prefix('+++')}{FAIL} Failed: {failed}")
181        else:
182            print(f"{_prefix('+++')}{OK} All tasks successful")
183
184    return failed

Run tasks in parallel and print results as each task finishes.

Args: tasks: List of (name, spec) pairs. verbose: If True, print output even for successful tasks. print_duration: If True, include duration in status lines. spinner_suffix: Suffix after the count in the spinner, e.g. "tasks". print_summary: If True, print a final success/failure summary.

Returns: List of failed task names (empty on full success).