Module materialize.lint.lint
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import argparse
import os
import subprocess
import threading
import time
from datetime import datetime, timedelta
from pathlib import Path
from materialize import MZ_ROOT, buildkite
from materialize.terminal import (
COLOR_ERROR,
COLOR_OK,
STYLE_BOLD,
with_formatting,
with_formattings,
)
MAIN_PATH = MZ_ROOT / "ci" / "test" / "lint-main"
MAIN_CHECKS_PATH = MAIN_PATH / "checks"
CHECK_BEFORE_PATH = MAIN_PATH / "before"
CHECK_AFTER_PATH = MAIN_PATH / "after"
OK = with_formatting("✓", COLOR_OK)
FAIL = with_formattings("✗", [COLOR_ERROR, STYLE_BOLD])
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="lint",
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Lint the code",
)
parser.add_argument("--print-duration", action="store_true")
parser.add_argument("--verbose", action="store_true")
return parser.parse_args()
def main() -> int:
args = parse_args()
print_duration = args.print_duration
verbose_output = args.verbose
manager = LintManager(print_duration, verbose_output)
return_code = manager.run()
return return_code
def prefix(ci: str = "---") -> str:
return ci + " " if buildkite.is_in_buildkite() else ""
class LintManager:
def __init__(self, print_duration: bool, verbose_output: bool):
self.print_duration = print_duration
self.verbose_output = verbose_output
def run(self) -> int:
failed_checks = self.run_and_validate_if_no_previous_failures(
CHECK_BEFORE_PATH, previous_failures=[]
)
failed_checks = self.run_and_validate_if_no_previous_failures(
MAIN_CHECKS_PATH, previous_failures=failed_checks
)
failed_checks = self.run_and_validate_if_no_previous_failures(
CHECK_AFTER_PATH, previous_failures=failed_checks
)
success = len(failed_checks) == 0
print(
prefix("+++") + f"{OK} All checks successful"
if success
else f"{FAIL} Checks failed: {failed_checks}"
)
return 0 if success else 1
def is_ignore_file(self, path: Path) -> bool:
return os.path.isdir(path)
def run_and_validate_if_no_previous_failures(
self, checks_path: Path, previous_failures: list[str]
) -> list[str]:
if len(previous_failures) > 0:
print(
f"{prefix()}Skipping checks in '{checks_path}' due to previous failures"
)
return previous_failures
else:
return self.run_and_validate(checks_path)
def run_and_validate(self, checks_path: Path) -> list[str]:
"""
Runs checks in the given directory and validates their outcome.
:return: names of failed checks
"""
lint_files = [
lint_file
for lint_file in os.listdir(checks_path)
if lint_file.endswith(".sh")
and not self.is_ignore_file(checks_path / lint_file)
]
lint_files.sort()
threads = []
check = "check" if len(lint_files) == 1 else "checks"
status_printer_thread = StatusPrinterThread(
f"{len(lint_files)} {check} in {checks_path.relative_to(MZ_ROOT)}"
)
status_printer_thread.start()
for lint_file in lint_files:
thread = LintingThread(checks_path, lint_file)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
status_printer_thread.stop()
failed_checks = []
for thread in threads:
formatted_duration = (
f" in {thread.duration.total_seconds():.2f}s"
if self.print_duration
else ""
)
if thread.success:
status = f"{OK}{formatted_duration}"
print(f"{prefix('---')}{status} {thread.name}")
else:
status = f"{FAIL}{formatted_duration}"
print(f"{prefix('+++')}{status} {thread.name}")
failed_checks.append(thread.name)
if thread.has_output() and (not thread.success or self.verbose_output):
print(thread.output)
return failed_checks
class LintingThread(threading.Thread):
def __init__(self, checks_path: Path, lint_file: str):
super().__init__(target=self.run_single_script, args=(checks_path, lint_file))
self.name = lint_file
self.output: str = ""
self.success = False
self.duration: timedelta | None = None
def run_single_script(self, directory_path: Path, file_name: str) -> None:
start_time = datetime.now()
try:
# Note that coloring gets lost (e.g., in git diff)
proc = subprocess.Popen(
directory_path / file_name,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
stdout, _ = proc.communicate()
self.success = proc.returncode == 0
self.capture_output(stdout)
except Exception as e:
print(f"Error: {e}")
self.success = False
end_time = datetime.now()
self.duration = end_time - start_time
def capture_output(self, stdout: bytes) -> None:
# stdout contains both stdout and stderr because stderr is piped there
self.output = stdout.decode("utf-8").strip()
def has_output(self) -> bool:
return len(self.output) > 0
class StatusPrinterThread(threading.Thread):
def __init__(self, current_step: str) -> None:
super().__init__(target=self.print_status, args=())
self.active = not buildkite.is_in_buildkite()
self.current_step = current_step
def print_status(self) -> None:
symbols = ["⣾", "⣷", "⣯", "⣟", "⡿", "⢿", "⣻", "⣽"]
i = 0
while self.active:
print(f"\r\033[K{symbols[i]} {self.current_step}", end="", flush=True)
i = (i + 1) % len(symbols)
time.sleep(0.1)
def stop(self) -> None:
if self.active:
self.active = False
print(f"\r\033[K{prefix()}{self.current_step}", end="", flush=True)
print()
if __name__ == "__main__":
exit(main())
Functions
def main() ‑> int
-
Expand source code Browse git
def main() -> int: args = parse_args() print_duration = args.print_duration verbose_output = args.verbose manager = LintManager(print_duration, verbose_output) return_code = manager.run() return return_code
def parse_args() ‑> argparse.Namespace
-
Expand source code Browse git
def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog="lint", formatter_class=argparse.RawDescriptionHelpFormatter, description="Lint the code", ) parser.add_argument("--print-duration", action="store_true") parser.add_argument("--verbose", action="store_true") return parser.parse_args()
def prefix(ci: str = '---') ‑> str
-
Expand source code Browse git
def prefix(ci: str = "---") -> str: return ci + " " if buildkite.is_in_buildkite() else ""
Classes
class LintManager (print_duration: bool, verbose_output: bool)
-
Expand source code Browse git
class LintManager: def __init__(self, print_duration: bool, verbose_output: bool): self.print_duration = print_duration self.verbose_output = verbose_output def run(self) -> int: failed_checks = self.run_and_validate_if_no_previous_failures( CHECK_BEFORE_PATH, previous_failures=[] ) failed_checks = self.run_and_validate_if_no_previous_failures( MAIN_CHECKS_PATH, previous_failures=failed_checks ) failed_checks = self.run_and_validate_if_no_previous_failures( CHECK_AFTER_PATH, previous_failures=failed_checks ) success = len(failed_checks) == 0 print( prefix("+++") + f"{OK} All checks successful" if success else f"{FAIL} Checks failed: {failed_checks}" ) return 0 if success else 1 def is_ignore_file(self, path: Path) -> bool: return os.path.isdir(path) def run_and_validate_if_no_previous_failures( self, checks_path: Path, previous_failures: list[str] ) -> list[str]: if len(previous_failures) > 0: print( f"{prefix()}Skipping checks in '{checks_path}' due to previous failures" ) return previous_failures else: return self.run_and_validate(checks_path) def run_and_validate(self, checks_path: Path) -> list[str]: """ Runs checks in the given directory and validates their outcome. :return: names of failed checks """ lint_files = [ lint_file for lint_file in os.listdir(checks_path) if lint_file.endswith(".sh") and not self.is_ignore_file(checks_path / lint_file) ] lint_files.sort() threads = [] check = "check" if len(lint_files) == 1 else "checks" status_printer_thread = StatusPrinterThread( f"{len(lint_files)} {check} in {checks_path.relative_to(MZ_ROOT)}" ) status_printer_thread.start() for lint_file in lint_files: thread = LintingThread(checks_path, lint_file) thread.start() threads.append(thread) for thread in threads: thread.join() status_printer_thread.stop() failed_checks = [] for thread in threads: formatted_duration = ( f" in {thread.duration.total_seconds():.2f}s" if self.print_duration else "" ) if thread.success: status = f"{OK}{formatted_duration}" print(f"{prefix('---')}{status} {thread.name}") else: status = f"{FAIL}{formatted_duration}" print(f"{prefix('+++')}{status} {thread.name}") failed_checks.append(thread.name) if thread.has_output() and (not thread.success or self.verbose_output): print(thread.output) return failed_checks
Methods
def is_ignore_file(self, path: pathlib.Path) ‑> bool
-
Expand source code Browse git
def is_ignore_file(self, path: Path) -> bool: return os.path.isdir(path)
def run(self) ‑> int
-
Expand source code Browse git
def run(self) -> int: failed_checks = self.run_and_validate_if_no_previous_failures( CHECK_BEFORE_PATH, previous_failures=[] ) failed_checks = self.run_and_validate_if_no_previous_failures( MAIN_CHECKS_PATH, previous_failures=failed_checks ) failed_checks = self.run_and_validate_if_no_previous_failures( CHECK_AFTER_PATH, previous_failures=failed_checks ) success = len(failed_checks) == 0 print( prefix("+++") + f"{OK} All checks successful" if success else f"{FAIL} Checks failed: {failed_checks}" ) return 0 if success else 1
def run_and_validate(self, checks_path: pathlib.Path) ‑> list[str]
-
Runs checks in the given directory and validates their outcome. :return: names of failed checks
Expand source code Browse git
def run_and_validate(self, checks_path: Path) -> list[str]: """ Runs checks in the given directory and validates their outcome. :return: names of failed checks """ lint_files = [ lint_file for lint_file in os.listdir(checks_path) if lint_file.endswith(".sh") and not self.is_ignore_file(checks_path / lint_file) ] lint_files.sort() threads = [] check = "check" if len(lint_files) == 1 else "checks" status_printer_thread = StatusPrinterThread( f"{len(lint_files)} {check} in {checks_path.relative_to(MZ_ROOT)}" ) status_printer_thread.start() for lint_file in lint_files: thread = LintingThread(checks_path, lint_file) thread.start() threads.append(thread) for thread in threads: thread.join() status_printer_thread.stop() failed_checks = [] for thread in threads: formatted_duration = ( f" in {thread.duration.total_seconds():.2f}s" if self.print_duration else "" ) if thread.success: status = f"{OK}{formatted_duration}" print(f"{prefix('---')}{status} {thread.name}") else: status = f"{FAIL}{formatted_duration}" print(f"{prefix('+++')}{status} {thread.name}") failed_checks.append(thread.name) if thread.has_output() and (not thread.success or self.verbose_output): print(thread.output) return failed_checks
def run_and_validate_if_no_previous_failures(self, checks_path: pathlib.Path, previous_failures: list[str]) ‑> list[str]
-
Expand source code Browse git
def run_and_validate_if_no_previous_failures( self, checks_path: Path, previous_failures: list[str] ) -> list[str]: if len(previous_failures) > 0: print( f"{prefix()}Skipping checks in '{checks_path}' due to previous failures" ) return previous_failures else: return self.run_and_validate(checks_path)
class LintingThread (checks_path: pathlib.Path, lint_file: str)
-
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.
Expand source code Browse git
class LintingThread(threading.Thread): def __init__(self, checks_path: Path, lint_file: str): super().__init__(target=self.run_single_script, args=(checks_path, lint_file)) self.name = lint_file self.output: str = "" self.success = False self.duration: timedelta | None = None def run_single_script(self, directory_path: Path, file_name: str) -> None: start_time = datetime.now() try: # Note that coloring gets lost (e.g., in git diff) proc = subprocess.Popen( directory_path / file_name, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) stdout, _ = proc.communicate() self.success = proc.returncode == 0 self.capture_output(stdout) except Exception as e: print(f"Error: {e}") self.success = False end_time = datetime.now() self.duration = end_time - start_time def capture_output(self, stdout: bytes) -> None: # stdout contains both stdout and stderr because stderr is piped there self.output = stdout.decode("utf-8").strip() def has_output(self) -> bool: return len(self.output) > 0
Ancestors
- threading.Thread
Methods
def capture_output(self, stdout: bytes) ‑> None
-
Expand source code Browse git
def capture_output(self, stdout: bytes) -> None: # stdout contains both stdout and stderr because stderr is piped there self.output = stdout.decode("utf-8").strip()
def has_output(self) ‑> bool
-
Expand source code Browse git
def has_output(self) -> bool: return len(self.output) > 0
def run_single_script(self, directory_path: pathlib.Path, file_name: str) ‑> None
-
Expand source code Browse git
def run_single_script(self, directory_path: Path, file_name: str) -> None: start_time = datetime.now() try: # Note that coloring gets lost (e.g., in git diff) proc = subprocess.Popen( directory_path / file_name, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) stdout, _ = proc.communicate() self.success = proc.returncode == 0 self.capture_output(stdout) except Exception as e: print(f"Error: {e}") self.success = False end_time = datetime.now() self.duration = end_time - start_time
class StatusPrinterThread (current_step: str)
-
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.
Expand source code Browse git
class StatusPrinterThread(threading.Thread): def __init__(self, current_step: str) -> None: super().__init__(target=self.print_status, args=()) self.active = not buildkite.is_in_buildkite() self.current_step = current_step def print_status(self) -> None: symbols = ["⣾", "⣷", "⣯", "⣟", "⡿", "⢿", "⣻", "⣽"] i = 0 while self.active: print(f"\r\033[K{symbols[i]} {self.current_step}", end="", flush=True) i = (i + 1) % len(symbols) time.sleep(0.1) def stop(self) -> None: if self.active: self.active = False print(f"\r\033[K{prefix()}{self.current_step}", end="", flush=True) print()
Ancestors
- threading.Thread
Methods
def print_status(self) ‑> None
-
Expand source code Browse git
def print_status(self) -> None: symbols = ["⣾", "⣷", "⣯", "⣟", "⡿", "⢿", "⣻", "⣽"] i = 0 while self.active: print(f"\r\033[K{symbols[i]} {self.current_step}", end="", flush=True) i = (i + 1) % len(symbols) time.sleep(0.1)
def stop(self) ‑> None
-
Expand source code Browse git
def stop(self) -> None: if self.active: self.active = False print(f"\r\033[K{prefix()}{self.current_step}", end="", flush=True) print()