Module materialize.cli.run

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
#
# run.py — build and run a core service or test.

import argparse
import os
import shlex
import shutil
import signal
import sys
import tempfile
import uuid
from urllib.parse import urlparse

import psutil

from materialize import MZ_ROOT, rustc_flags, spawn, ui
from materialize.mzcompose import DEFAULT_SYSTEM_PARAMETERS
from materialize.ui import UIError
from materialize.xcompile import Arch

KNOWN_PROGRAMS = ["environmentd", "sqllogictest"]
REQUIRED_SERVICES = ["clusterd"]

SANITIZER_TARGET = (
    f"{Arch.host()}-unknown-linux-gnu"
    if sys.platform.startswith("linux")
    else f"{Arch.host()}-apple-darwin"
)
DEFAULT_POSTGRES = "postgres://root@localhost:26257/materialize"

# sets entitlements on the built binary, e.g. environmentd, so you can inspect it with Instruments
MACOS_ENTITLEMENTS_DATA = """
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
    <dict>
        <key>com.apple.security.get-task-allow</key>
        <true/>
    </dict>
</plist>
"""


def main() -> int:
    parser = argparse.ArgumentParser(
        prog="run",
        description="""Build and run a core service or test.
        Wraps `cargo run` and `cargo test` with Materialize-specific logic.""",
    )
    parser.add_argument(
        "program",
        help="the name of the program to run",
        choices=[*KNOWN_PROGRAMS, "test"],
    )
    parser.add_argument(
        "args",
        help="Arguments to pass to the program",
        nargs="*",
    )
    parser.add_argument(
        "--reset",
        help="Delete data from prior runs of the program",
        action="store_true",
    )
    parser.add_argument(
        "--postgres",
        help="Postgres/CockroachDB connection string",
        default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES),
    )
    parser.add_argument(
        "--release",
        help="Build artifacts in release mode, with optimizations",
        action="store_true",
    )
    parser.add_argument(
        "--timings",
        help="Output timing information",
        action="store_true",
    )
    parser.add_argument(
        "--features",
        help="Comma separated list of features to activate",
    )
    parser.add_argument(
        "--no-default-features",
        help="Do not activate the `default` feature",
        action="store_true",
    )
    parser.add_argument(
        "-p",
        "--package",
        help="Package to run tests for",
        action="append",
        default=[],
    )
    parser.add_argument(
        "--test",
        help="Test only the specified test target",
        action="append",
        default=[],
    )
    parser.add_argument(
        "--tokio-console",
        help="Activate the Tokio console",
        action="store_true",
    )
    parser.add_argument(
        "--build-only",
        help="Only build, don't run",
        action="store_true",
    )
    parser.add_argument(
        "--disable-mac-codesigning",
        help="Disables the limited codesigning we do on macOS to support Instruments",
        action="store_true",
    )
    parser.add_argument(
        "--coverage",
        help="Build with coverage",
        default=False,
        action="store_true",
    )
    parser.add_argument(
        "--sanitizer",
        help="Build with sanitizer",
        type=str,
        default="none",
    )
    parser.add_argument(
        "--wrapper",
        help="Wrapper command for the program",
    )
    parser.add_argument(
        "--monitoring",
        help="Automatically send monitoring data.",
        default=False,
        action="store_true",
    )
    args = parser.parse_intermixed_args()

    # Handle `+toolchain` like rustup.
    args.channel = None
    if len(args.args) > 0 and args.args[0].startswith("+"):
        args.channel = args.args[0]
        del args.args[0]

    env = dict(os.environ)
    if args.program in KNOWN_PROGRAMS:
        (build_retcode, built_programs) = _build(args, extra_programs=[args.program])
        if args.build_only:
            return build_retcode

        if args.release:
            if args.sanitizer != "none":
                artifact_path = MZ_ROOT / "target" / SANITIZER_TARGET / "release"
            else:
                artifact_path = MZ_ROOT / "target" / "release"
        else:
            if args.sanitizer != "none":
                artifact_path = MZ_ROOT / "target" / SANITIZER_TARGET / "debug"
            else:
                artifact_path = MZ_ROOT / "target" / "debug"

        if args.disable_mac_codesigning:
            if sys.platform != "darwin":
                print("Ignoring --disable-mac-codesigning since we're not on macOS")
            else:
                print("Disabled macOS Codesigning")
        elif sys.platform == "darwin":
            for program in built_programs:
                path = artifact_path / program
                _macos_codesign(str(path))

        if args.wrapper:
            command = shlex.split(args.wrapper)
        else:
            command = []
        command.append(str(artifact_path / args.program))
        if args.tokio_console:
            command += ["--tokio-console-listen-addr=127.0.0.1:6669"]
        if args.program == "environmentd":
            _handle_lingering_services(kill=args.reset)
            mzdata = MZ_ROOT / "mzdata"
            scratch = MZ_ROOT / "scratch"
            db = urlparse(args.postgres).path.removeprefix("/")
            _run_sql(args.postgres, f"CREATE DATABASE IF NOT EXISTS {db}")
            for schema in ["consensus", "tsoracle", "storage"]:
                if args.reset:
                    _run_sql(args.postgres, f"DROP SCHEMA IF EXISTS {schema} CASCADE")
                _run_sql(args.postgres, f"CREATE SCHEMA IF NOT EXISTS {schema}")
            # Keep this after clearing out Postgres. Otherwise there is a race
            # where a ctrl-c could leave persist with references in Postgres to
            # files that have been deleted. There's no race if we reset in the
            # opposite order.
            if args.reset:
                # Remove everything in the `mzdata`` directory *except* for
                # the `prometheus` directory and all contents of `tempo`.
                paths = list(mzdata.glob("prometheus/*"))
                paths.extend(
                    p
                    for p in mzdata.glob("*")
                    if p.name != "prometheus" and p.name != "tempo"
                )
                paths.extend(p for p in scratch.glob("*"))
                for path in paths:
                    print(f"Removing {path}...")
                    if path.is_dir():
                        shutil.rmtree(path, ignore_errors=True)
                    else:
                        path.unlink()

            mzdata.mkdir(exist_ok=True)
            scratch.mkdir(exist_ok=True)
            environment_file = mzdata / "environment-id"
            try:
                environment_id = environment_file.read_text().rstrip()
            except FileNotFoundError:
                environment_id = f"local-az1-{uuid.uuid4()}-0"
                environment_file.write_text(environment_id)

            command += [
                # Setting the listen addresses below to 0.0.0.0 is required
                # to allow Prometheus running in Docker (misc/prometheus)
                # access these services to scrape metrics.
                "--internal-http-listen-addr=0.0.0.0:6878",
                "--orchestrator=process",
                f"--orchestrator-process-secrets-directory={mzdata}/secrets",
                "--orchestrator-process-tcp-proxy-listen-addr=0.0.0.0",
                f"--orchestrator-process-prometheus-service-discovery-directory={mzdata}/prometheus",
                f"--orchestrator-process-scratch-directory={scratch}",
                "--secrets-controller=local-file",
                f"--persist-consensus-url={args.postgres}?options=--search_path=consensus",
                f"--persist-blob-url=file://{mzdata}/persist/blob",
                f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle",
                f"--storage-stash-url={args.postgres}?options=--search_path=storage",
                f"--environment-id={environment_id}",
                "--bootstrap-role=materialize",
                *args.args,
            ]
            if args.monitoring:
                command += ["--opentelemetry-endpoint=http://localhost:4317"]
        elif args.program == "sqllogictest":
            formatted_params = [
                f"{key}={value}" for key, value in DEFAULT_SYSTEM_PARAMETERS.items()
            ]
            env["MZ_SYSTEM_PARAMETER_DEFAULT"] = ";".join(formatted_params)
            db = urlparse(args.postgres).path.removeprefix("/")
            _run_sql(args.postgres, f"CREATE DATABASE IF NOT EXISTS {db}")
            command += [f"--postgres-url={args.postgres}", *args.args]
    elif args.program == "test":
        (build_retcode, _) = _build(args)
        if args.build_only:
            return build_retcode

        command = _cargo_command(args, "test")
        for package in args.package:
            command += ["--package", package]
        for test in args.test:
            command += ["--test", test]
        command += args.args
        command += ["--", "--nocapture"]
        os.environ["COCKROACH_URL"] = args.postgres
    else:
        raise UIError(f"unknown program {args.program}")

    print(f"$ {' '.join(command)}")
    # We go through a dance here familiar to shell authors where both
    # the parent and child try to put the child into its own process
    # group.  (See the comments in jobs.c:make_child() in bash, for
    # example, which further cite the POSIX Rationale.)  We will later
    # kill this group, which catches children like clusterd which
    # outlive their parent (and hence, their PPID is 1; but their PGID
    # remains the child's).  We also put the child into the foreground
    # to ensure signals, such as SIGINT from ^C and SIGQUIT from ^\,
    # are delivered to it, rather than to us.
    child_pid = os.fork()
    assert child_pid >= 0
    if child_pid == 0:
        try:
            os.setsid()
            os.setpgid(os.getpid(), os.getpid())
        except OSError:
            pass
        _set_foreground_process(os.getpid())
        os.execvpe(command[0], command, env)

    try:
        os.setpgid(child_pid, child_pid)
    except OSError:
        pass
    (_, ws) = os.wait()
    try:
        os.killpg(child_pid, signal.SIGTERM)
    except ProcessLookupError:
        pass
    exit(os.waitstatus_to_exitcode(ws))


def _set_foreground_process(pid: int) -> None:
    # Conventionally, stderr is used for this purpose as the
    # least-likely stream to be redirected in an interactive context.
    if not os.isatty(sys.stderr.fileno()):
        return
    signal.signal(signal.SIGTTOU, signal.SIG_IGN)
    with open(os.ttyname(sys.stderr.fileno()), "w") as tty:
        os.tcsetpgrp(tty.fileno(), os.getpgrp())


def _build(
    args: argparse.Namespace, extra_programs: list[str] = []
) -> tuple[int, list[str]]:
    env = dict(os.environ)
    command = _cargo_command(args, "build")
    features = []

    if args.coverage:
        env["RUSTFLAGS"] = (
            env.get("RUSTFLAGS", "") + " " + " ".join(rustc_flags.coverage)
        )
    if args.sanitizer != "none":
        env["RUSTFLAGS"] = (
            env.get("RUSTFLAGS", "")
            + " "
            + " ".join(rustc_flags.sanitizer[args.sanitizer])
        )
        env["CFLAGS"] = (
            env.get("CFLAGS", "")
            + " "
            + " ".join(rustc_flags.sanitizer_cflags[args.sanitizer])
        )
        env["CXXFLAGS"] = (
            env.get("CXXFLAGS", "")
            + " "
            + " ".join(rustc_flags.sanitizer_cflags[args.sanitizer])
        )
        env["LDFLAGS"] = (
            env.get("LDFLAGS", "")
            + " "
            + " ".join(rustc_flags.sanitizer_cflags[args.sanitizer])
        )
    if args.features:
        features.extend(args.features.split(","))
    if features:
        command += [f"--features={','.join(features)}"]

    programs = [*REQUIRED_SERVICES, *extra_programs]
    for program in programs:
        command += ["--bin", program]
    completed_proc = spawn.runv(command, env=env)

    return (completed_proc.returncode, programs)


def _macos_codesign(path: str) -> None:
    env = dict(os.environ)
    command = ["codesign"]
    command.extend(["-s", "-", "-f", "--entitlements"])

    # write our entitlements file to a temp path
    temp = tempfile.NamedTemporaryFile()
    temp.write(bytes(MACOS_ENTITLEMENTS_DATA, "utf-8"))
    temp.flush()

    command.append(temp.name)
    command.append(path)

    spawn.runv(command, env=env)


def _cargo_command(args: argparse.Namespace, subcommand: str) -> list[str]:
    command = ["cargo"]
    if args.channel:
        command += [args.channel]
    command += [subcommand]
    if args.release:
        command += ["--release"]
    if args.timings:
        command += ["--timings"]
    if args.no_default_features:
        command += ["--no-default-features"]
    if args.sanitizer != "none":
        command += ["-Zbuild-std", "--target", SANITIZER_TARGET]
    return command


def _run_sql(url: str, sql: str) -> None:
    try:
        spawn.runv(["psql", "-AtX", url, "-c", sql])
    except Exception as e:
        raise UIError(
            f"unable to execute postgres statement: {e}",
            hint="Have you installed and started CockroachDB?",
        )


def _handle_lingering_services(kill: bool = False) -> None:
    uid = os.getuid()
    for proc in psutil.process_iter():
        try:
            if proc.name() in REQUIRED_SERVICES:
                if proc.uids().real != uid:
                    print(
                        f"Ignoring {proc.name()} process with different UID (PID {proc.pid}, likely running in Docker)"
                    )
                elif kill:
                    print(f"Killing orphaned {proc.name()} process (PID {proc.pid})")
                    proc.kill()
                else:
                    ui.warn(
                        f"Existing {proc.name()} process (PID {proc.pid}) will be reused"
                    )
        except psutil.NoSuchProcess:
            continue


if __name__ == "__main__":
    with ui.error_handler("run"):
        main()

Functions

def main() ‑> int
Expand source code Browse git
def main() -> int:
    parser = argparse.ArgumentParser(
        prog="run",
        description="""Build and run a core service or test.
        Wraps `cargo run` and `cargo test` with Materialize-specific logic.""",
    )
    parser.add_argument(
        "program",
        help="the name of the program to run",
        choices=[*KNOWN_PROGRAMS, "test"],
    )
    parser.add_argument(
        "args",
        help="Arguments to pass to the program",
        nargs="*",
    )
    parser.add_argument(
        "--reset",
        help="Delete data from prior runs of the program",
        action="store_true",
    )
    parser.add_argument(
        "--postgres",
        help="Postgres/CockroachDB connection string",
        default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES),
    )
    parser.add_argument(
        "--release",
        help="Build artifacts in release mode, with optimizations",
        action="store_true",
    )
    parser.add_argument(
        "--timings",
        help="Output timing information",
        action="store_true",
    )
    parser.add_argument(
        "--features",
        help="Comma separated list of features to activate",
    )
    parser.add_argument(
        "--no-default-features",
        help="Do not activate the `default` feature",
        action="store_true",
    )
    parser.add_argument(
        "-p",
        "--package",
        help="Package to run tests for",
        action="append",
        default=[],
    )
    parser.add_argument(
        "--test",
        help="Test only the specified test target",
        action="append",
        default=[],
    )
    parser.add_argument(
        "--tokio-console",
        help="Activate the Tokio console",
        action="store_true",
    )
    parser.add_argument(
        "--build-only",
        help="Only build, don't run",
        action="store_true",
    )
    parser.add_argument(
        "--disable-mac-codesigning",
        help="Disables the limited codesigning we do on macOS to support Instruments",
        action="store_true",
    )
    parser.add_argument(
        "--coverage",
        help="Build with coverage",
        default=False,
        action="store_true",
    )
    parser.add_argument(
        "--sanitizer",
        help="Build with sanitizer",
        type=str,
        default="none",
    )
    parser.add_argument(
        "--wrapper",
        help="Wrapper command for the program",
    )
    parser.add_argument(
        "--monitoring",
        help="Automatically send monitoring data.",
        default=False,
        action="store_true",
    )
    args = parser.parse_intermixed_args()

    # Handle `+toolchain` like rustup.
    args.channel = None
    if len(args.args) > 0 and args.args[0].startswith("+"):
        args.channel = args.args[0]
        del args.args[0]

    env = dict(os.environ)
    if args.program in KNOWN_PROGRAMS:
        (build_retcode, built_programs) = _build(args, extra_programs=[args.program])
        if args.build_only:
            return build_retcode

        if args.release:
            if args.sanitizer != "none":
                artifact_path = MZ_ROOT / "target" / SANITIZER_TARGET / "release"
            else:
                artifact_path = MZ_ROOT / "target" / "release"
        else:
            if args.sanitizer != "none":
                artifact_path = MZ_ROOT / "target" / SANITIZER_TARGET / "debug"
            else:
                artifact_path = MZ_ROOT / "target" / "debug"

        if args.disable_mac_codesigning:
            if sys.platform != "darwin":
                print("Ignoring --disable-mac-codesigning since we're not on macOS")
            else:
                print("Disabled macOS Codesigning")
        elif sys.platform == "darwin":
            for program in built_programs:
                path = artifact_path / program
                _macos_codesign(str(path))

        if args.wrapper:
            command = shlex.split(args.wrapper)
        else:
            command = []
        command.append(str(artifact_path / args.program))
        if args.tokio_console:
            command += ["--tokio-console-listen-addr=127.0.0.1:6669"]
        if args.program == "environmentd":
            _handle_lingering_services(kill=args.reset)
            mzdata = MZ_ROOT / "mzdata"
            scratch = MZ_ROOT / "scratch"
            db = urlparse(args.postgres).path.removeprefix("/")
            _run_sql(args.postgres, f"CREATE DATABASE IF NOT EXISTS {db}")
            for schema in ["consensus", "tsoracle", "storage"]:
                if args.reset:
                    _run_sql(args.postgres, f"DROP SCHEMA IF EXISTS {schema} CASCADE")
                _run_sql(args.postgres, f"CREATE SCHEMA IF NOT EXISTS {schema}")
            # Keep this after clearing out Postgres. Otherwise there is a race
            # where a ctrl-c could leave persist with references in Postgres to
            # files that have been deleted. There's no race if we reset in the
            # opposite order.
            if args.reset:
                # Remove everything in the `mzdata`` directory *except* for
                # the `prometheus` directory and all contents of `tempo`.
                paths = list(mzdata.glob("prometheus/*"))
                paths.extend(
                    p
                    for p in mzdata.glob("*")
                    if p.name != "prometheus" and p.name != "tempo"
                )
                paths.extend(p for p in scratch.glob("*"))
                for path in paths:
                    print(f"Removing {path}...")
                    if path.is_dir():
                        shutil.rmtree(path, ignore_errors=True)
                    else:
                        path.unlink()

            mzdata.mkdir(exist_ok=True)
            scratch.mkdir(exist_ok=True)
            environment_file = mzdata / "environment-id"
            try:
                environment_id = environment_file.read_text().rstrip()
            except FileNotFoundError:
                environment_id = f"local-az1-{uuid.uuid4()}-0"
                environment_file.write_text(environment_id)

            command += [
                # Setting the listen addresses below to 0.0.0.0 is required
                # to allow Prometheus running in Docker (misc/prometheus)
                # access these services to scrape metrics.
                "--internal-http-listen-addr=0.0.0.0:6878",
                "--orchestrator=process",
                f"--orchestrator-process-secrets-directory={mzdata}/secrets",
                "--orchestrator-process-tcp-proxy-listen-addr=0.0.0.0",
                f"--orchestrator-process-prometheus-service-discovery-directory={mzdata}/prometheus",
                f"--orchestrator-process-scratch-directory={scratch}",
                "--secrets-controller=local-file",
                f"--persist-consensus-url={args.postgres}?options=--search_path=consensus",
                f"--persist-blob-url=file://{mzdata}/persist/blob",
                f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle",
                f"--storage-stash-url={args.postgres}?options=--search_path=storage",
                f"--environment-id={environment_id}",
                "--bootstrap-role=materialize",
                *args.args,
            ]
            if args.monitoring:
                command += ["--opentelemetry-endpoint=http://localhost:4317"]
        elif args.program == "sqllogictest":
            formatted_params = [
                f"{key}={value}" for key, value in DEFAULT_SYSTEM_PARAMETERS.items()
            ]
            env["MZ_SYSTEM_PARAMETER_DEFAULT"] = ";".join(formatted_params)
            db = urlparse(args.postgres).path.removeprefix("/")
            _run_sql(args.postgres, f"CREATE DATABASE IF NOT EXISTS {db}")
            command += [f"--postgres-url={args.postgres}", *args.args]
    elif args.program == "test":
        (build_retcode, _) = _build(args)
        if args.build_only:
            return build_retcode

        command = _cargo_command(args, "test")
        for package in args.package:
            command += ["--package", package]
        for test in args.test:
            command += ["--test", test]
        command += args.args
        command += ["--", "--nocapture"]
        os.environ["COCKROACH_URL"] = args.postgres
    else:
        raise UIError(f"unknown program {args.program}")

    print(f"$ {' '.join(command)}")
    # We go through a dance here familiar to shell authors where both
    # the parent and child try to put the child into its own process
    # group.  (See the comments in jobs.c:make_child() in bash, for
    # example, which further cite the POSIX Rationale.)  We will later
    # kill this group, which catches children like clusterd which
    # outlive their parent (and hence, their PPID is 1; but their PGID
    # remains the child's).  We also put the child into the foreground
    # to ensure signals, such as SIGINT from ^C and SIGQUIT from ^\,
    # are delivered to it, rather than to us.
    child_pid = os.fork()
    assert child_pid >= 0
    if child_pid == 0:
        try:
            os.setsid()
            os.setpgid(os.getpid(), os.getpid())
        except OSError:
            pass
        _set_foreground_process(os.getpid())
        os.execvpe(command[0], command, env)

    try:
        os.setpgid(child_pid, child_pid)
    except OSError:
        pass
    (_, ws) = os.wait()
    try:
        os.killpg(child_pid, signal.SIGTERM)
    except ProcessLookupError:
        pass
    exit(os.waitstatus_to_exitcode(ws))