misc.python.materialize.parallel_task
Run shell commands in parallel with timing and status reporting.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Run shell commands in parallel with timing and status reporting.""" 11 12from __future__ import annotations 13 14import queue 15import subprocess 16import sys 17import threading 18import time 19from collections.abc import Callable 20from datetime import datetime, timedelta 21 22from materialize import buildkite 23from materialize.terminal import ( 24 COLOR_ERROR, 25 COLOR_OK, 26 STYLE_BOLD, 27 with_formatting, 28 with_formattings, 29) 30 31TaskSpec = list[str] | Callable[[], tuple[bool, str]] 32 33OK = with_formatting("✓", COLOR_OK) 34FAIL = with_formattings("✗", [COLOR_ERROR, STYLE_BOLD]) 35 36 37def _prefix(ci: str = "---") -> str: 38 return ci + " " if buildkite.is_in_buildkite() else "" 39 40 41class TaskThread(threading.Thread): 42 """Runs a shell command or callable in a thread, capturing output, duration, and exit status.""" 43 44 def __init__( 45 self, 46 name: str, 47 spec: TaskSpec | None = None, 48 command: list[str] | None = None, 49 done_queue: queue.Queue[TaskThread] | None = None, 50 ): 51 super().__init__() 52 self.name = name 53 self.output: str = "" 54 self.success = False 55 self.duration: timedelta = timedelta() 56 self._done_queue = done_queue 57 resolved = command if spec is None else spec 58 assert resolved is not None, "must provide spec or command" 59 if callable(resolved): 60 self._fn: Callable[[], tuple[bool, str]] | None = resolved 61 self._command: list[str] | None = None 62 else: 63 self._fn = None 64 self._command = resolved 65 66 def run(self) -> None: 67 start = datetime.now() 68 try: 69 if self._fn is not None: 70 self.success, self.output = self._fn() 71 else: 72 assert self._command is not None 73 proc = subprocess.Popen( 74 self._command, 75 stdout=subprocess.PIPE, 76 stderr=subprocess.STDOUT, 77 ) 78 stdout, _ = proc.communicate() 79 self.success = proc.returncode == 0 80 self.output = stdout.decode("utf-8").strip() 81 except Exception as e: 82 self.output = str(e) 83 self.success = False 84 self.duration = datetime.now() - start 85 if self._done_queue is not None: 86 self._done_queue.put(self) 87 88 89class _SpinnerThread(threading.Thread): 90 def __init__(self, remaining: int, suffix: str = "tasks") -> None: 91 super().__init__(daemon=True) 92 self._remaining = remaining 93 self._suffix = suffix 94 # "checks in foo" -> "check in foo" 95 parts = suffix.split(" ", 1) 96 self._singular = parts[0].rstrip("s") + ( 97 " " + parts[1] if len(parts) > 1 else "" 98 ) 99 self._lock = threading.Lock() 100 self.active = not buildkite.is_in_buildkite() and sys.stdout.isatty() 101 102 def run(self) -> None: 103 symbols = ["⣾", "⣷", "⣯", "⣟", "⡿", "⢿", "⣻", "⣽"] 104 i = 0 105 while self.active: 106 with self._lock: 107 remaining = self._remaining 108 suffix = self._suffix if remaining != 1 else self._singular 109 print( 110 f"\r\033[K{symbols[i]} {remaining} {suffix}", 111 end="", 112 flush=True, 113 ) 114 i = (i + 1) % len(symbols) 115 time.sleep(0.1) 116 117 def task_done(self) -> None: 118 with self._lock: 119 self._remaining -= 1 120 121 def clear_line(self) -> None: 122 if self.active: 123 print("\r\033[K", end="", flush=True) 124 125 def stop(self) -> None: 126 if self.active: 127 self.active = False 128 print("\r\033[K", end="", flush=True) 129 130 131def run_parallel( 132 tasks: list[tuple[str, TaskSpec]], 133 verbose: bool = False, 134 print_duration: bool = True, 135 spinner_suffix: str = "tasks", 136 print_summary: bool = True, 137) -> list[str]: 138 """Run tasks in parallel and print results as each task finishes. 139 140 Args: 141 tasks: List of (name, spec) pairs. 142 verbose: If True, print output even for successful tasks. 143 print_duration: If True, include duration in status lines. 144 spinner_suffix: Suffix after the count in the spinner, e.g. "tasks". 145 print_summary: If True, print a final success/failure summary. 146 147 Returns: 148 List of failed task names (empty on full success). 149 """ 150 done_q: queue.Queue[TaskThread] = queue.Queue() 151 threads = [TaskThread(name, spec, done_queue=done_q) for name, spec in tasks] 152 153 spinner = _SpinnerThread(len(threads), spinner_suffix) 154 spinner.start() 155 156 for t in threads: 157 t.start() 158 159 failed = [] 160 for _ in threads: 161 t = done_q.get() 162 spinner.task_done() 163 spinner.clear_line() 164 formatted_duration = ( 165 f" [{t.duration.total_seconds():5.2f}s]" if print_duration else "" 166 ) 167 if t.success: 168 print(f"{_prefix('---')}{OK}{formatted_duration} {t.name}") 169 else: 170 print(f"{_prefix('+++')}{FAIL}{formatted_duration} {t.name}") 171 failed.append(t.name) 172 if t.output and (not t.success or verbose): 173 print(t.output) 174 175 spinner.stop() 176 177 if print_summary: 178 if failed: 179 print(f"{_prefix('+++')}{FAIL} Failed: {failed}") 180 else: 181 print(f"{_prefix('+++')}{OK} All tasks successful") 182 183 return failed
42class TaskThread(threading.Thread): 43 """Runs a shell command or callable in a thread, capturing output, duration, and exit status.""" 44 45 def __init__( 46 self, 47 name: str, 48 spec: TaskSpec | None = None, 49 command: list[str] | None = None, 50 done_queue: queue.Queue[TaskThread] | None = None, 51 ): 52 super().__init__() 53 self.name = name 54 self.output: str = "" 55 self.success = False 56 self.duration: timedelta = timedelta() 57 self._done_queue = done_queue 58 resolved = command if spec is None else spec 59 assert resolved is not None, "must provide spec or command" 60 if callable(resolved): 61 self._fn: Callable[[], tuple[bool, str]] | None = resolved 62 self._command: list[str] | None = None 63 else: 64 self._fn = None 65 self._command = resolved 66 67 def run(self) -> None: 68 start = datetime.now() 69 try: 70 if self._fn is not None: 71 self.success, self.output = self._fn() 72 else: 73 assert self._command is not None 74 proc = subprocess.Popen( 75 self._command, 76 stdout=subprocess.PIPE, 77 stderr=subprocess.STDOUT, 78 ) 79 stdout, _ = proc.communicate() 80 self.success = proc.returncode == 0 81 self.output = stdout.decode("utf-8").strip() 82 except Exception as e: 83 self.output = str(e) 84 self.success = False 85 self.duration = datetime.now() - start 86 if self._done_queue is not None: 87 self._done_queue.put(self)
Runs a shell command or callable in a thread, capturing output, duration, and exit status.
45 def __init__( 46 self, 47 name: str, 48 spec: TaskSpec | None = None, 49 command: list[str] | None = None, 50 done_queue: queue.Queue[TaskThread] | None = None, 51 ): 52 super().__init__() 53 self.name = name 54 self.output: str = "" 55 self.success = False 56 self.duration: timedelta = timedelta() 57 self._done_queue = done_queue 58 resolved = command if spec is None else spec 59 assert resolved is not None, "must provide spec or command" 60 if callable(resolved): 61 self._fn: Callable[[], tuple[bool, str]] | None = resolved 62 self._command: list[str] | None = None 63 else: 64 self._fn = None 65 self._command = resolved
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
1181 @property 1182 def name(self): 1183 """A string used for identification purposes only. 1184 1185 It has no semantics. Multiple threads may be given the same name. The 1186 initial name is set by the constructor. 1187 1188 """ 1189 assert self._initialized, "Thread.__init__() not called" 1190 return self._name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
67 def run(self) -> None: 68 start = datetime.now() 69 try: 70 if self._fn is not None: 71 self.success, self.output = self._fn() 72 else: 73 assert self._command is not None 74 proc = subprocess.Popen( 75 self._command, 76 stdout=subprocess.PIPE, 77 stderr=subprocess.STDOUT, 78 ) 79 stdout, _ = proc.communicate() 80 self.success = proc.returncode == 0 81 self.output = stdout.decode("utf-8").strip() 82 except Exception as e: 83 self.output = str(e) 84 self.success = False 85 self.duration = datetime.now() - start 86 if self._done_queue is not None: 87 self._done_queue.put(self)
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
132def run_parallel( 133 tasks: list[tuple[str, TaskSpec]], 134 verbose: bool = False, 135 print_duration: bool = True, 136 spinner_suffix: str = "tasks", 137 print_summary: bool = True, 138) -> list[str]: 139 """Run tasks in parallel and print results as each task finishes. 140 141 Args: 142 tasks: List of (name, spec) pairs. 143 verbose: If True, print output even for successful tasks. 144 print_duration: If True, include duration in status lines. 145 spinner_suffix: Suffix after the count in the spinner, e.g. "tasks". 146 print_summary: If True, print a final success/failure summary. 147 148 Returns: 149 List of failed task names (empty on full success). 150 """ 151 done_q: queue.Queue[TaskThread] = queue.Queue() 152 threads = [TaskThread(name, spec, done_queue=done_q) for name, spec in tasks] 153 154 spinner = _SpinnerThread(len(threads), spinner_suffix) 155 spinner.start() 156 157 for t in threads: 158 t.start() 159 160 failed = [] 161 for _ in threads: 162 t = done_q.get() 163 spinner.task_done() 164 spinner.clear_line() 165 formatted_duration = ( 166 f" [{t.duration.total_seconds():5.2f}s]" if print_duration else "" 167 ) 168 if t.success: 169 print(f"{_prefix('---')}{OK}{formatted_duration} {t.name}") 170 else: 171 print(f"{_prefix('+++')}{FAIL}{formatted_duration} {t.name}") 172 failed.append(t.name) 173 if t.output and (not t.success or verbose): 174 print(t.output) 175 176 spinner.stop() 177 178 if print_summary: 179 if failed: 180 print(f"{_prefix('+++')}{FAIL} Failed: {failed}") 181 else: 182 print(f"{_prefix('+++')}{OK} All tasks successful") 183 184 return failed
Run tasks in parallel and print results as each task finishes.
Args: tasks: List of (name, spec) pairs. verbose: If True, print output even for successful tasks. print_duration: If True, include duration in status lines. spinner_suffix: Suffix after the count in the spinner, e.g. "tasks". print_summary: If True, print a final success/failure summary.
Returns: List of failed task names (empty on full success).