Module materialize.cli.cloudbench

Launch benchmark for a particular commit on cloud infrastructure, using bin/scratch

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Launch benchmark for a particular commit on cloud infrastructure, using bin/scratch"""

import argparse
import base64
import csv
import datetime
import itertools
import os
import shlex
import sys
import time
from typing import NamedTuple, cast

import boto3

from materialize import MZ_ROOT, git, scratch, spawn, util
from materialize.cli.scratch import check_required_vars
from materialize.scratch import print_instances


# This is duplicated with the one in cli/scratch.
# TODO - factor it out.
def main() -> None:
    os.chdir(MZ_ROOT)
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(dest="subcommand", required=True)
    for cmd_name, configure, run in [
        ("start", configure_start, start),
        ("check", configure_check, check),
        #        ("mine", mine.configure_parser, mine.run),
        #        ("destroy", destroy.configure_parser, destroy.run),
    ]:
        s = subparsers.add_parser(cmd_name)
        configure(s)
        s.set_defaults(run=run)

    args = parser.parse_args()
    args.run(args)


def configure_start(parser: argparse.ArgumentParser) -> None:
    parser.add_argument(
        "--profile",
        choices=["basic", "confluent"],
        type=str,
        required=True,
        help="Predefined set of machines to use in the cluster. 'basic' is only the Materialize instance; 'confluent' also includes a machine running the Kafka, Schema Registry, etc.",
    )
    parser.add_argument(
        "--trials",
        "-n",
        type=int,
        default=1,
        help="The number of trials to run per git rev",
    )
    parser.add_argument(
        "--revs",
        type=str,
        default="HEAD",
        help="Comma-separated list of git revs to benchmark",
    )
    parser.add_argument(
        "bench_script",
        type=str,
        nargs=argparse.REMAINDER,
        help="Benchmark script (and optional arguments)",
    )
    parser.add_argument(
        "--append_metadata",
        help="whether to append extra metadata to each CSV row before uploading to S3",
        action="store_true",
    )
    parser.add_argument("--s3_root", type=str, default=DEFAULT_BUCKET)


class BenchSuccessResult(NamedTuple):
    stdout: str


class BenchFailureLogs(NamedTuple):
    log: str


def configure_check(parser: argparse.ArgumentParser) -> None:
    parser.add_argument("--s3_root", type=str, default=DEFAULT_BUCKET)
    parser.add_argument("bench_id", type=str, nargs=1)


DEFAULT_BUCKET = "mz-cloudbench"


def try_get_object(key: str, bucket: str) -> str | None:
    client = boto3.client("s3")
    try:
        result = client.get_object(Bucket=bucket, Key=key)
        return result["Body"].read().decode("utf-8")
    except client.exceptions.NoSuchKey:
        return None


def check(ns: argparse.Namespace) -> None:
    check_required_vars()
    bench_id = ns.bench_id[0]

    manifest = (
        boto3.client("s3")
        .get_object(Bucket=ns.s3_root, Key=f"{bench_id}/MANIFEST")["Body"]
        .read()
        .decode("utf-8")
        .strip()
    )
    insts = manifest.split("\n")
    if not insts:
        raise RuntimeError(f"No instances found for bench ID {bench_id}")
    results: list[BenchSuccessResult | BenchFailureLogs | None] = [None for _ in insts]
    not_done = list(range(len(results)))
    while not_done:
        for i in not_done:
            maybe_result = try_get_object(f"{bench_id}/{insts[i]}.csv", ns.s3_root)
            if maybe_result is None:
                maybe_out = try_get_object(
                    f"{bench_id}/{insts[i]}-FAILURE.log", ns.s3_root
                )
                if maybe_out is None:
                    continue
                results[i] = BenchFailureLogs(maybe_out)
            else:
                results[i] = BenchSuccessResult(stdout=maybe_result)

        not_done = [i for i in not_done if not results[i]]
        if not_done:
            print("Benchmark not done; waiting 60 seconds", file=sys.stderr)
            time.sleep(60)
    for r in results:
        assert isinstance(r, BenchSuccessResult) or isinstance(r, BenchFailureLogs)
    done_results = cast(list[BenchFailureLogs | BenchSuccessResult], results)
    failed = [
        (i, r) for i, r in enumerate(done_results) if isinstance(r, BenchFailureLogs)
    ]
    if failed:
        for i, f in failed:
            print(
                f"Run of instance {insts[i]} failed, log:\n{f.log}",
                file=sys.stderr,
            )
        raise RuntimeError(f"{len(failed)} runs FAILED!")
    good_results = cast(list[BenchSuccessResult], done_results)
    readers = [
        csv.DictReader(f"{line}\n" for line in r.stdout.split("\n"))
        for r in good_results
    ]
    csv_results = ((d.values() for d in r) for r in readers)
    for r in readers:
        assert isinstance(r.fieldnames, list)
        for fn in r.fieldnames:
            assert isinstance(fn, str)
    headers = set(tuple(cast(list[str], r.fieldnames)) for r in readers)
    if len(headers) > 1:
        raise RuntimeError("Mismatched headers")
    w = csv.writer(sys.stdout)
    w.writerow(
        cast(list[str], readers[0].fieldnames) + ["InstanceIndex", "Rev", "Trial"]
    )
    for inst, r in zip(insts, csv_results):
        components = inst.split("-")
        for i, entry in enumerate(r):
            w.writerow(itertools.chain(entry, (components[0], components[1], i)))


def start(ns: argparse.Namespace) -> None:
    check_required_vars()

    revs = ns.revs.split(",")

    clusters = list(
        itertools.product(range(ns.trials), (git.rev_parse(rev) for rev in revs))
    )

    bench_script = ns.bench_script
    script_name = bench_script[0]
    script_args = " ".join(shlex.quote(arg) for arg in bench_script[1:])

    # zip up the `misc` repository, for shipment to the remote machine
    os.chdir("misc/python")
    spawn.runv(["python3", "./setup.py", "sdist"])

    with open("./dist/materialize-0.0.0.tar.gz", "rb") as f:
        pkg_data = f.read()
    os.chdir(os.environ["MZ_ROOT"])

    if ns.append_metadata:
        munge_result = 'awk \'{ if (NR == 1) { print $0 ",Timestamp,BenchId,ClusterId,GitRef,S3Root" } else { print $0 ",\'$(date +%s)",$MZ_CB_BENCH_ID,$MZ_CB_CLUSTER_ID,$MZ_CB_GIT_REV,$MZ_CB_S3_ROOT"\'"}}\''
    else:
        munge_result = "cat"

    mz_launch_script = f"""echo {shlex.quote(base64.b64encode(pkg_data).decode('utf-8'))} | base64 -d > mz.tar.gz
python3 -m venv /tmp/mzenv >&2
. /tmp/mzenv/bin/activate >&2
python3 -m pip install --upgrade pip >&2
pip3 install ./mz.tar.gz[dev] >&2
MZ_ROOT=/home/ubuntu/materialize python3 -m {script_name} {script_args}
result=$?
echo $result > ~/bench_exit_code
if [ $result -eq 0 ]; then
    {munge_result} < ~/materialize/results.csv | aws s3 cp - s3://{ns.s3_root}/$MZ_CB_BENCH_ID/$MZ_CB_CLUSTER_ID.csv >&2
else
    aws s3 cp - s3://{ns.s3_root}/$MZ_CB_BENCH_ID/$MZ_CB_CLUSTER_ID-FAILURE.log < ~/mzscratch.log >&2
fi
sudo shutdown -h now # save some money
"""

    if ns.profile == "basic":
        descs = [
            scratch.MachineDesc(
                name="materialized",
                launch_script=mz_launch_script,
                instance_type="r5a.4xlarge",
                ami="ami-0aeb7c931a5a61206",
                tags={},
                size_gb=64,
            ),
        ]
    elif ns.profile == "confluent":
        confluent_launch_script = """
curl https://packages.confluent.io/deb/7.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb https://packages.confluent.io/deb/7.0 stable main"
sudo add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"
sudo apt-get update
sudo apt-get install -y openjdk-8-jre-headless confluent-kafka confluent-schema-registry
sudo systemctl start confluent-zookeeper
sudo systemctl start confluent-kafka
sudo systemctl start confluent-schema-registry
"""
        descs = [
            scratch.MachineDesc(
                name="materialized",
                launch_script=mz_launch_script,
                instance_type="r5a.4xlarge",
                ami="ami-0aeb7c931a5a61206",
                tags={},
                size_gb=64,
            ),
            scratch.MachineDesc(
                name="confluent",
                launch_script=confluent_launch_script,
                instance_type="r5a.4xlarge",
                ami="ami-0aeb7c931a5a61206",
                tags={},
                size_gb=1000,
                checkout=False,
            ),
        ]
    else:
        raise RuntimeError(f"Profile {ns.profile} is not implemented yet")

    bench_id = util.nonce(8)

    manifest_bytes = "".join(f"{i}-{rev}\n" for i, rev in clusters).encode("utf-8")
    boto3.client("s3").put_object(
        Body=manifest_bytes, Bucket="mz-cloudbench", Key=f"{bench_id}/MANIFEST"
    )

    # TODO - Do these in parallel
    launched = []
    for (i, rev) in clusters:
        launched += scratch.launch_cluster(
            descs=descs,
            nonce=f"{bench_id}-{i}-{rev}",
            extra_tags={
                "bench_id": bench_id,
                "bench_rev": rev,
                "bench_i": str(i),
                "LaunchedBy": scratch.whoami(),
            },
            extra_env={
                "MZ_CB_BENCH_ID": bench_id,
                "MZ_CB_CLUSTER_ID": f"{i}-{rev}",
                "MZ_CB_GIT_REV": rev,
                "MZ_CB_S3_ROOT": ns.s3_root,
            },
            delete_after=datetime.datetime.utcnow() + datetime.timedelta(days=1),
            git_rev=rev,
        )

    print("Launched instances:")
    print_instances(launched, format="table")  # todo
    print(
        f"""Launched cloud bench with ID {bench_id}.
To wait for results, run: bin/cloudbench check {bench_id}"""
    )


if __name__ == "__main__":
    main()

Functions

def check(ns: argparse.Namespace) ‑> None
Expand source code Browse git
def check(ns: argparse.Namespace) -> None:
    check_required_vars()
    bench_id = ns.bench_id[0]

    manifest = (
        boto3.client("s3")
        .get_object(Bucket=ns.s3_root, Key=f"{bench_id}/MANIFEST")["Body"]
        .read()
        .decode("utf-8")
        .strip()
    )
    insts = manifest.split("\n")
    if not insts:
        raise RuntimeError(f"No instances found for bench ID {bench_id}")
    results: list[BenchSuccessResult | BenchFailureLogs | None] = [None for _ in insts]
    not_done = list(range(len(results)))
    while not_done:
        for i in not_done:
            maybe_result = try_get_object(f"{bench_id}/{insts[i]}.csv", ns.s3_root)
            if maybe_result is None:
                maybe_out = try_get_object(
                    f"{bench_id}/{insts[i]}-FAILURE.log", ns.s3_root
                )
                if maybe_out is None:
                    continue
                results[i] = BenchFailureLogs(maybe_out)
            else:
                results[i] = BenchSuccessResult(stdout=maybe_result)

        not_done = [i for i in not_done if not results[i]]
        if not_done:
            print("Benchmark not done; waiting 60 seconds", file=sys.stderr)
            time.sleep(60)
    for r in results:
        assert isinstance(r, BenchSuccessResult) or isinstance(r, BenchFailureLogs)
    done_results = cast(list[BenchFailureLogs | BenchSuccessResult], results)
    failed = [
        (i, r) for i, r in enumerate(done_results) if isinstance(r, BenchFailureLogs)
    ]
    if failed:
        for i, f in failed:
            print(
                f"Run of instance {insts[i]} failed, log:\n{f.log}",
                file=sys.stderr,
            )
        raise RuntimeError(f"{len(failed)} runs FAILED!")
    good_results = cast(list[BenchSuccessResult], done_results)
    readers = [
        csv.DictReader(f"{line}\n" for line in r.stdout.split("\n"))
        for r in good_results
    ]
    csv_results = ((d.values() for d in r) for r in readers)
    for r in readers:
        assert isinstance(r.fieldnames, list)
        for fn in r.fieldnames:
            assert isinstance(fn, str)
    headers = set(tuple(cast(list[str], r.fieldnames)) for r in readers)
    if len(headers) > 1:
        raise RuntimeError("Mismatched headers")
    w = csv.writer(sys.stdout)
    w.writerow(
        cast(list[str], readers[0].fieldnames) + ["InstanceIndex", "Rev", "Trial"]
    )
    for inst, r in zip(insts, csv_results):
        components = inst.split("-")
        for i, entry in enumerate(r):
            w.writerow(itertools.chain(entry, (components[0], components[1], i)))
def configure_check(parser: argparse.ArgumentParser) ‑> None
Expand source code Browse git
def configure_check(parser: argparse.ArgumentParser) -> None:
    parser.add_argument("--s3_root", type=str, default=DEFAULT_BUCKET)
    parser.add_argument("bench_id", type=str, nargs=1)
def configure_start(parser: argparse.ArgumentParser) ‑> None
Expand source code Browse git
def configure_start(parser: argparse.ArgumentParser) -> None:
    parser.add_argument(
        "--profile",
        choices=["basic", "confluent"],
        type=str,
        required=True,
        help="Predefined set of machines to use in the cluster. 'basic' is only the Materialize instance; 'confluent' also includes a machine running the Kafka, Schema Registry, etc.",
    )
    parser.add_argument(
        "--trials",
        "-n",
        type=int,
        default=1,
        help="The number of trials to run per git rev",
    )
    parser.add_argument(
        "--revs",
        type=str,
        default="HEAD",
        help="Comma-separated list of git revs to benchmark",
    )
    parser.add_argument(
        "bench_script",
        type=str,
        nargs=argparse.REMAINDER,
        help="Benchmark script (and optional arguments)",
    )
    parser.add_argument(
        "--append_metadata",
        help="whether to append extra metadata to each CSV row before uploading to S3",
        action="store_true",
    )
    parser.add_argument("--s3_root", type=str, default=DEFAULT_BUCKET)
def main() ‑> None
Expand source code Browse git
def main() -> None:
    os.chdir(MZ_ROOT)
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(dest="subcommand", required=True)
    for cmd_name, configure, run in [
        ("start", configure_start, start),
        ("check", configure_check, check),
        #        ("mine", mine.configure_parser, mine.run),
        #        ("destroy", destroy.configure_parser, destroy.run),
    ]:
        s = subparsers.add_parser(cmd_name)
        configure(s)
        s.set_defaults(run=run)

    args = parser.parse_args()
    args.run(args)
def start(ns: argparse.Namespace) ‑> None
Expand source code Browse git
def start(ns: argparse.Namespace) -> None:
    check_required_vars()

    revs = ns.revs.split(",")

    clusters = list(
        itertools.product(range(ns.trials), (git.rev_parse(rev) for rev in revs))
    )

    bench_script = ns.bench_script
    script_name = bench_script[0]
    script_args = " ".join(shlex.quote(arg) for arg in bench_script[1:])

    # zip up the `misc` repository, for shipment to the remote machine
    os.chdir("misc/python")
    spawn.runv(["python3", "./setup.py", "sdist"])

    with open("./dist/materialize-0.0.0.tar.gz", "rb") as f:
        pkg_data = f.read()
    os.chdir(os.environ["MZ_ROOT"])

    if ns.append_metadata:
        munge_result = 'awk \'{ if (NR == 1) { print $0 ",Timestamp,BenchId,ClusterId,GitRef,S3Root" } else { print $0 ",\'$(date +%s)",$MZ_CB_BENCH_ID,$MZ_CB_CLUSTER_ID,$MZ_CB_GIT_REV,$MZ_CB_S3_ROOT"\'"}}\''
    else:
        munge_result = "cat"

    mz_launch_script = f"""echo {shlex.quote(base64.b64encode(pkg_data).decode('utf-8'))} | base64 -d > mz.tar.gz
python3 -m venv /tmp/mzenv >&2
. /tmp/mzenv/bin/activate >&2
python3 -m pip install --upgrade pip >&2
pip3 install ./mz.tar.gz[dev] >&2
MZ_ROOT=/home/ubuntu/materialize python3 -m {script_name} {script_args}
result=$?
echo $result > ~/bench_exit_code
if [ $result -eq 0 ]; then
    {munge_result} < ~/materialize/results.csv | aws s3 cp - s3://{ns.s3_root}/$MZ_CB_BENCH_ID/$MZ_CB_CLUSTER_ID.csv >&2
else
    aws s3 cp - s3://{ns.s3_root}/$MZ_CB_BENCH_ID/$MZ_CB_CLUSTER_ID-FAILURE.log < ~/mzscratch.log >&2
fi
sudo shutdown -h now # save some money
"""

    if ns.profile == "basic":
        descs = [
            scratch.MachineDesc(
                name="materialized",
                launch_script=mz_launch_script,
                instance_type="r5a.4xlarge",
                ami="ami-0aeb7c931a5a61206",
                tags={},
                size_gb=64,
            ),
        ]
    elif ns.profile == "confluent":
        confluent_launch_script = """
curl https://packages.confluent.io/deb/7.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb https://packages.confluent.io/deb/7.0 stable main"
sudo add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"
sudo apt-get update
sudo apt-get install -y openjdk-8-jre-headless confluent-kafka confluent-schema-registry
sudo systemctl start confluent-zookeeper
sudo systemctl start confluent-kafka
sudo systemctl start confluent-schema-registry
"""
        descs = [
            scratch.MachineDesc(
                name="materialized",
                launch_script=mz_launch_script,
                instance_type="r5a.4xlarge",
                ami="ami-0aeb7c931a5a61206",
                tags={},
                size_gb=64,
            ),
            scratch.MachineDesc(
                name="confluent",
                launch_script=confluent_launch_script,
                instance_type="r5a.4xlarge",
                ami="ami-0aeb7c931a5a61206",
                tags={},
                size_gb=1000,
                checkout=False,
            ),
        ]
    else:
        raise RuntimeError(f"Profile {ns.profile} is not implemented yet")

    bench_id = util.nonce(8)

    manifest_bytes = "".join(f"{i}-{rev}\n" for i, rev in clusters).encode("utf-8")
    boto3.client("s3").put_object(
        Body=manifest_bytes, Bucket="mz-cloudbench", Key=f"{bench_id}/MANIFEST"
    )

    # TODO - Do these in parallel
    launched = []
    for (i, rev) in clusters:
        launched += scratch.launch_cluster(
            descs=descs,
            nonce=f"{bench_id}-{i}-{rev}",
            extra_tags={
                "bench_id": bench_id,
                "bench_rev": rev,
                "bench_i": str(i),
                "LaunchedBy": scratch.whoami(),
            },
            extra_env={
                "MZ_CB_BENCH_ID": bench_id,
                "MZ_CB_CLUSTER_ID": f"{i}-{rev}",
                "MZ_CB_GIT_REV": rev,
                "MZ_CB_S3_ROOT": ns.s3_root,
            },
            delete_after=datetime.datetime.utcnow() + datetime.timedelta(days=1),
            git_rev=rev,
        )

    print("Launched instances:")
    print_instances(launched, format="table")  # todo
    print(
        f"""Launched cloud bench with ID {bench_id}.
To wait for results, run: bin/cloudbench check {bench_id}"""
    )
def try_get_object(key: str, bucket: str) ‑> str | None
Expand source code Browse git
def try_get_object(key: str, bucket: str) -> str | None:
    client = boto3.client("s3")
    try:
        result = client.get_object(Bucket=bucket, Key=key)
        return result["Body"].read().decode("utf-8")
    except client.exceptions.NoSuchKey:
        return None

Classes

class BenchFailureLogs (log: str)

BenchFailureLogs(log,)

Expand source code Browse git
class BenchFailureLogs(NamedTuple):
    log: str

Ancestors

  • builtins.tuple

Instance variables

var log : str

Alias for field number 0

class BenchSuccessResult (stdout: str)

BenchSuccessResult(stdout,)

Expand source code Browse git
class BenchSuccessResult(NamedTuple):
    stdout: str

Ancestors

  • builtins.tuple

Instance variables

var stdout : str

Alias for field number 0