Module materialize.cli.ci_upload_heap_profiles
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
#
# ci_upload_heap_profiles.py - Upload memory heap profiles during an mzcompose run
import argparse
import json
import re
import subprocess
import sys
import time
from threading import Thread
CLUSTERD_COMMAND_RE = re.compile(
r"--internal-http-listen-addr=(?P<socket>[^ ]*).*--opentelemetry-resource=cluster_id=(?P<cluster_id>[^ ]*).*--opentelemetry-resource=replica_id=(?P<replica_id>[^ ]*).*/(?P<process>[0-9]+)\.pid"
)
def main() -> int:
parser = argparse.ArgumentParser(
prog="ci-upload-heap-profiles",
formatter_class=argparse.RawDescriptionHelpFormatter,
description="ci-upload-heap-profiles uploads memory heap profiles during an mzcompose run",
)
parser.add_argument("composition", type=str)
parser.add_argument("--upload", action=argparse.BooleanOptionalAction, default=True)
args = parser.parse_args()
mzcompose = ["bin/mzcompose", "--find", args.composition, "--mz-quiet"]
time_str = time.strftime("%Y-%m-%d_%H_%M_%S")
threads = []
def run(service: str, backend: list[str], suffix: str = ""):
heap_profile = subprocess.run(
mzcompose + ["exec", service, "curl", "--silent"] + backend,
text=False,
capture_output=True,
).stdout
if not heap_profile:
return
filename = f"prof-{service}{suffix}-{time_str}.pb.gz"
with open(filename, "wb") as f:
f.write(heap_profile)
if args.upload:
subprocess.run(
[
"buildkite-agent",
"artifact",
"upload",
"--log-level",
"error",
filename,
]
)
services = json.loads(
subprocess.run(
mzcompose + ["ps", "--format", "json"], text=True, capture_output=True
).stdout
)
for service in services:
if service["Image"].startswith("materialize/clusterd:"):
threads.append(
Thread(
target=run,
args=(service["Service"], ["http://127.0.0.1:6878/heap"]),
)
)
elif service["Image"].startswith("materialize/materialized:"):
threads.append(
Thread(
target=run,
args=(service["Service"], ["http://127.0.0.1:6878/prof/heap"]),
)
)
for line in subprocess.run(
mzcompose + ["exec", service["Service"], "ps", "aux"],
text=True,
capture_output=True,
).stdout.splitlines():
if match := CLUSTERD_COMMAND_RE.search(line):
threads.append(
Thread(
target=run,
args=(
service["Service"],
[
"--unix-socket",
match.group("socket"),
"http:/prof/heap",
],
f"-cluster-{match.group('cluster_id')}-replica-{match.group('replica_id')}-process-{match.group('process')}",
),
)
)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
return 0
if __name__ == "__main__":
sys.exit(main())
Functions
def main() ‑> int
-
Expand source code Browse git
def main() -> int: parser = argparse.ArgumentParser( prog="ci-upload-heap-profiles", formatter_class=argparse.RawDescriptionHelpFormatter, description="ci-upload-heap-profiles uploads memory heap profiles during an mzcompose run", ) parser.add_argument("composition", type=str) parser.add_argument("--upload", action=argparse.BooleanOptionalAction, default=True) args = parser.parse_args() mzcompose = ["bin/mzcompose", "--find", args.composition, "--mz-quiet"] time_str = time.strftime("%Y-%m-%d_%H_%M_%S") threads = [] def run(service: str, backend: list[str], suffix: str = ""): heap_profile = subprocess.run( mzcompose + ["exec", service, "curl", "--silent"] + backend, text=False, capture_output=True, ).stdout if not heap_profile: return filename = f"prof-{service}{suffix}-{time_str}.pb.gz" with open(filename, "wb") as f: f.write(heap_profile) if args.upload: subprocess.run( [ "buildkite-agent", "artifact", "upload", "--log-level", "error", filename, ] ) services = json.loads( subprocess.run( mzcompose + ["ps", "--format", "json"], text=True, capture_output=True ).stdout ) for service in services: if service["Image"].startswith("materialize/clusterd:"): threads.append( Thread( target=run, args=(service["Service"], ["http://127.0.0.1:6878/heap"]), ) ) elif service["Image"].startswith("materialize/materialized:"): threads.append( Thread( target=run, args=(service["Service"], ["http://127.0.0.1:6878/prof/heap"]), ) ) for line in subprocess.run( mzcompose + ["exec", service["Service"], "ps", "aux"], text=True, capture_output=True, ).stdout.splitlines(): if match := CLUSTERD_COMMAND_RE.search(line): threads.append( Thread( target=run, args=( service["Service"], [ "--unix-socket", match.group("socket"), "http:/prof/heap", ], f"-cluster-{match.group('cluster_id')}-replica-{match.group('replica_id')}-process-{match.group('process')}", ), ) ) for thread in threads: thread.start() for thread in threads: thread.join() return 0