Module materialize.cli.ci_upload_heap_profiles

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
#
# ci_upload_heap_profiles.py - Upload memory heap profiles during an mzcompose run

import argparse
import json
import re
import subprocess
import sys
import time
from threading import Thread

CLUSTERD_COMMAND_RE = re.compile(
    r"--internal-http-listen-addr=(?P<socket>[^ ]*).*--opentelemetry-resource=cluster_id=(?P<cluster_id>[^ ]*).*--opentelemetry-resource=replica_id=(?P<replica_id>[^ ]*).*/(?P<process>[0-9]+)\.pid"
)


def main() -> int:
    parser = argparse.ArgumentParser(
        prog="ci-upload-heap-profiles",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description="ci-upload-heap-profiles uploads memory heap profiles during an mzcompose run",
    )

    parser.add_argument("composition", type=str)
    parser.add_argument("--upload", action=argparse.BooleanOptionalAction, default=True)

    args = parser.parse_args()
    mzcompose = ["bin/mzcompose", "--find", args.composition, "--mz-quiet"]
    time_str = time.strftime("%Y-%m-%d_%H_%M_%S")
    threads = []

    def run(service: str, backend: list[str], suffix: str = ""):
        heap_profile = subprocess.run(
            mzcompose + ["exec", service, "curl", "--silent"] + backend,
            text=False,
            capture_output=True,
        ).stdout

        if not heap_profile:
            return

        filename = f"prof-{service}{suffix}-{time_str}.pb.gz"
        with open(filename, "wb") as f:
            f.write(heap_profile)
        if args.upload:
            subprocess.run(
                [
                    "buildkite-agent",
                    "artifact",
                    "upload",
                    "--log-level",
                    "error",
                    filename,
                ]
            )

    services = json.loads(
        subprocess.run(
            mzcompose + ["ps", "--format", "json"], text=True, capture_output=True
        ).stdout
    )
    for service in services:
        if service["Image"].startswith("materialize/clusterd:"):
            threads.append(
                Thread(
                    target=run,
                    args=(service["Service"], ["http://127.0.0.1:6878/heap"]),
                )
            )
        elif service["Image"].startswith("materialize/materialized:"):
            threads.append(
                Thread(
                    target=run,
                    args=(service["Service"], ["http://127.0.0.1:6878/prof/heap"]),
                )
            )

            for line in subprocess.run(
                mzcompose + ["exec", service["Service"], "ps", "aux"],
                text=True,
                capture_output=True,
            ).stdout.splitlines():
                if match := CLUSTERD_COMMAND_RE.search(line):
                    threads.append(
                        Thread(
                            target=run,
                            args=(
                                service["Service"],
                                [
                                    "--unix-socket",
                                    match.group("socket"),
                                    "http:/prof/heap",
                                ],
                                f"-cluster-{match.group('cluster_id')}-replica-{match.group('replica_id')}-process-{match.group('process')}",
                            ),
                        )
                    )

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    return 0


if __name__ == "__main__":
    sys.exit(main())

Functions

def main() ‑> int
Expand source code Browse git
def main() -> int:
    parser = argparse.ArgumentParser(
        prog="ci-upload-heap-profiles",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description="ci-upload-heap-profiles uploads memory heap profiles during an mzcompose run",
    )

    parser.add_argument("composition", type=str)
    parser.add_argument("--upload", action=argparse.BooleanOptionalAction, default=True)

    args = parser.parse_args()
    mzcompose = ["bin/mzcompose", "--find", args.composition, "--mz-quiet"]
    time_str = time.strftime("%Y-%m-%d_%H_%M_%S")
    threads = []

    def run(service: str, backend: list[str], suffix: str = ""):
        heap_profile = subprocess.run(
            mzcompose + ["exec", service, "curl", "--silent"] + backend,
            text=False,
            capture_output=True,
        ).stdout

        if not heap_profile:
            return

        filename = f"prof-{service}{suffix}-{time_str}.pb.gz"
        with open(filename, "wb") as f:
            f.write(heap_profile)
        if args.upload:
            subprocess.run(
                [
                    "buildkite-agent",
                    "artifact",
                    "upload",
                    "--log-level",
                    "error",
                    filename,
                ]
            )

    services = json.loads(
        subprocess.run(
            mzcompose + ["ps", "--format", "json"], text=True, capture_output=True
        ).stdout
    )
    for service in services:
        if service["Image"].startswith("materialize/clusterd:"):
            threads.append(
                Thread(
                    target=run,
                    args=(service["Service"], ["http://127.0.0.1:6878/heap"]),
                )
            )
        elif service["Image"].startswith("materialize/materialized:"):
            threads.append(
                Thread(
                    target=run,
                    args=(service["Service"], ["http://127.0.0.1:6878/prof/heap"]),
                )
            )

            for line in subprocess.run(
                mzcompose + ["exec", service["Service"], "ps", "aux"],
                text=True,
                capture_output=True,
            ).stdout.splitlines():
                if match := CLUSTERD_COMMAND_RE.search(line):
                    threads.append(
                        Thread(
                            target=run,
                            args=(
                                service["Service"],
                                [
                                    "--unix-socket",
                                    match.group("socket"),
                                    "http:/prof/heap",
                                ],
                                f"-cluster-{match.group('cluster_id')}-replica-{match.group('replica_id')}-process-{match.group('process')}",
                            ),
                        )
                    )

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    return 0