Module materialize.scalability.benchmark_executor

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import pathlib
import threading
import time
from concurrent import futures
from typing import Any

import pandas as pd
from psycopg import Cursor

from materialize.scalability.benchmark_config import BenchmarkConfiguration
from materialize.scalability.benchmark_result import BenchmarkResult
from materialize.scalability.comparison_outcome import ComparisonOutcome
from materialize.scalability.df import df_details_cols, df_totals_cols
from materialize.scalability.df.df_details import DfDetails, concat_df_details
from materialize.scalability.df.df_totals import DfTotals, concat_df_totals
from materialize.scalability.endpoint import Endpoint
from materialize.scalability.io import paths
from materialize.scalability.operation import Operation
from materialize.scalability.result_analyzer import ResultAnalyzer
from materialize.scalability.schema import Schema
from materialize.scalability.workload import Workload, WorkloadWithContext
from materialize.scalability.workload_result import WorkloadResult
from materialize.scalability.workloads import *  # noqa: F401 F403
from materialize.scalability.workloads_test import *  # noqa: F401 F403

# number of retries in addition to the first run
MAX_RETRIES_ON_REGRESSION = 2


class BenchmarkExecutor:
    def __init__(
        self,
        config: BenchmarkConfiguration,
        schema: Schema,
        baseline_endpoint: Endpoint | None,
        other_endpoints: list[Endpoint],
        result_analyzer: ResultAnalyzer,
    ):
        self.config = config
        self.schema = schema
        self.baseline_endpoint = baseline_endpoint
        self.other_endpoints = other_endpoints
        self.result_analyzer = result_analyzer
        self.result = BenchmarkResult()

    def run_workloads(
        self,
    ) -> BenchmarkResult:

        for workload_cls in self.config.workload_classes:
            assert issubclass(
                workload_cls, Workload
            ), f"{workload_cls} is not a Workload"
            self.run_workload_for_all_endpoints(
                workload_cls,
            )

        return self.result

    def run_workload_for_all_endpoints(
        self,
        workload_cls: type[Workload],
    ):
        if self.baseline_endpoint is not None:
            baseline_result = self.run_workload_for_endpoint(
                self.baseline_endpoint,
                self.create_workload_instance(
                    workload_cls, endpoint=self.baseline_endpoint
                ),
            )
        else:
            baseline_result = None

        for other_endpoint in self.other_endpoints:
            comparison_outcome = self.run_and_evaluate_workload_for_endpoint(
                workload_cls, other_endpoint, baseline_result, try_count=0
            )

            self.result.add_regression(comparison_outcome)

    def run_and_evaluate_workload_for_endpoint(
        self,
        workload_cls: type[Workload],
        other_endpoint: Endpoint,
        baseline_result: WorkloadResult | None,
        try_count: int,
    ) -> ComparisonOutcome | None:
        workload_name = workload_cls.__name__
        other_endpoint_result = self.run_workload_for_endpoint(
            other_endpoint,
            self.create_workload_instance(workload_cls, endpoint=other_endpoint),
        )

        if self.baseline_endpoint is None or baseline_result is None:
            return None

        outcome = self.result_analyzer.perform_comparison_in_workload(
            workload_name,
            self.baseline_endpoint,
            other_endpoint,
            baseline_result,
            other_endpoint_result,
        )

        if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION:
            print(
                f"Potential regression in workload {workload_name} at endpoint {other_endpoint},"
                f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}"
            )
            return self.run_and_evaluate_workload_for_endpoint(
                workload_cls, other_endpoint, baseline_result, try_count=try_count + 1
            )

        return outcome

    def run_workload_for_endpoint(
        self,
        endpoint: Endpoint,
        workload: Workload,
    ) -> WorkloadResult:
        print(f"Running workload {workload.name()} on {endpoint}")

        df_totals = DfTotals()
        df_details = DfDetails()

        concurrencies = self._get_concurrencies()
        print(f"Concurrencies: {concurrencies}")

        for concurrency in concurrencies:
            df_total, df_detail = self.run_workload_for_endpoint_with_concurrency(
                endpoint,
                workload,
                concurrency,
                self.config.get_count_for_concurrency(concurrency),
            )
            df_totals = concat_df_totals([df_totals, df_total])
            df_details = concat_df_details([df_details, df_detail])

            endpoint_version_name = endpoint.try_load_version()
            pathlib.Path(paths.endpoint_dir(endpoint_version_name)).mkdir(
                parents=True, exist_ok=True
            )

            df_totals.to_csv(
                paths.df_totals_csv(endpoint_version_name, workload.name())
            )
            df_details.to_csv(
                paths.df_details_csv(endpoint_version_name, workload.name())
            )

        result = WorkloadResult(workload, endpoint, df_totals, df_details)
        self._record_results(result)
        return result

    def run_workload_for_endpoint_with_concurrency(
        self,
        endpoint: Endpoint,
        workload: Workload,
        concurrency: int,
        count: int,
    ) -> tuple[DfTotals, DfDetails]:
        print(
            f"Preparing benchmark for workload '{workload.name()}' at concurrency {concurrency} ..."
        )
        endpoint.up()

        init_sqls = self.schema.init_sqls()

        init_conn = endpoint.sql_connection()
        init_conn.autocommit = True
        init_cursor = init_conn.cursor()
        for init_sql in init_sqls:
            print(init_sql)
            init_cursor.execute(init_sql.encode("utf8"))

        for init_operation in workload.init_operations():
            workload.execute_operation(
                init_operation, init_cursor, -1, -1, self.config.verbose
            )

        print(
            f"Creating a cursor pool with {concurrency} entries against endpoint: {endpoint.url()}"
        )
        cursor_pool = self._create_cursor_pool(concurrency, endpoint)

        print(
            f"Benchmarking workload '{workload.name()}' at concurrency {concurrency} ..."
        )
        operations = workload.operations()

        global next_worker_id
        next_worker_id = 0
        local = threading.local()
        lock = threading.Lock()

        start = time.time()
        with futures.ThreadPoolExecutor(
            concurrency, initializer=self.initialize_worker, initargs=(local, lock)
        ) as executor:
            measurements = executor.map(
                self.execute_operation,
                [
                    (
                        workload,
                        concurrency,
                        local,
                        cursor_pool,
                        operations[i % len(operations)],
                        int(i / len(operations)),
                    )
                    for i in range(count)
                ],
            )
        wallclock_total = time.time() - start

        df_detail = pd.DataFrame(measurements)
        print("Best and worst individual measurements:")
        print(df_detail.sort_values(by=[df_details_cols.WALLCLOCK]))

        print(
            f"concurrency: {concurrency}; wallclock_total: {wallclock_total}; tps = {count/wallclock_total}"
        )

        df_total = pd.DataFrame(
            [
                {
                    df_totals_cols.CONCURRENCY: concurrency,
                    df_totals_cols.WALLCLOCK: wallclock_total,
                    df_totals_cols.WORKLOAD: workload.name(),
                    df_totals_cols.COUNT: count,
                    df_totals_cols.TPS: count / wallclock_total,
                    df_totals_cols.MEAN_TX_DURATION: df_detail[
                        df_details_cols.WALLCLOCK
                    ].mean(),
                    df_totals_cols.MEDIAN_TX_DURATION: df_detail[
                        df_details_cols.WALLCLOCK
                    ].median(),
                    df_totals_cols.MIN_TX_DURATION: df_detail[
                        df_details_cols.WALLCLOCK
                    ].min(),
                    df_totals_cols.MAX_TX_DURATION: df_detail[
                        df_details_cols.WALLCLOCK
                    ].max(),
                }
            ]
        )

        return DfTotals(df_total), DfDetails(df_detail)

    def execute_operation(
        self, args: tuple[Workload, int, threading.local, list[Cursor], Operation, int]
    ) -> dict[str, Any]:
        workload, concurrency, local, cursor_pool, operation, transaction_index = args
        worker_id = local.worker_id
        assert (
            len(cursor_pool) >= worker_id + 1
        ), f"len(cursor_pool) is {len(cursor_pool)} but local.worker_id is {worker_id}"
        cursor = cursor_pool[worker_id]

        start = time.time()
        workload.execute_operation(
            operation, cursor, worker_id, transaction_index, self.config.verbose
        )
        wallclock = time.time() - start

        return {
            df_details_cols.CONCURRENCY: concurrency,
            df_details_cols.WALLCLOCK: wallclock,
            df_details_cols.OPERATION: type(operation).__name__,
            df_details_cols.WORKLOAD: workload.name(),
            df_details_cols.TRANSACTION_INDEX: transaction_index,
        }

    def create_workload_instance(
        self, workload_cls: type[Workload], endpoint: Endpoint
    ) -> Workload:
        workload = workload_cls()

        if isinstance(workload, WorkloadWithContext):
            workload.set_endpoint(endpoint)
            workload.set_schema(self.schema)

        return workload

    def initialize_worker(self, local: threading.local, lock: threading.Lock):
        """Give each other worker thread a unique ID"""
        lock.acquire()
        global next_worker_id
        local.worker_id = next_worker_id
        next_worker_id = next_worker_id + 1
        lock.release()

    def _get_concurrencies(self) -> list[int]:
        range_end = 1024 if self.config.exponent_base < 2.0 else 32
        concurrencies: list[int] = [
            round(self.config.exponent_base**c) for c in range(0, range_end)
        ]
        concurrencies = sorted(set(concurrencies))
        return [
            c
            for c in concurrencies
            if self.config.min_concurrency <= c <= self.config.max_concurrency
        ]

    def _create_cursor_pool(self, concurrency: int, endpoint: Endpoint) -> list[Cursor]:
        connect_sqls = self.schema.connect_sqls()

        cursor_pool = []
        for i in range(concurrency):
            conn = endpoint.sql_connection()
            conn.autocommit = True
            cursor = conn.cursor()
            for connect_sql in connect_sqls:
                cursor.execute(connect_sql.encode("utf8"))
            cursor_pool.append(cursor)

        return cursor_pool

    def _record_results(self, result: WorkloadResult) -> None:
        endpoint_version_info = result.endpoint.try_load_version()
        print(
            f"Collecting results of endpoint {result.endpoint} with name {endpoint_version_info}"
        )

        self.result.append_workload_result(endpoint_version_info, result)

Classes

class BenchmarkExecutor (config: BenchmarkConfiguration, schema: Schema, baseline_endpoint: Endpoint | None, other_endpoints: list[Endpoint], result_analyzer: ResultAnalyzer)
Expand source code Browse git
class BenchmarkExecutor:
    def __init__(
        self,
        config: BenchmarkConfiguration,
        schema: Schema,
        baseline_endpoint: Endpoint | None,
        other_endpoints: list[Endpoint],
        result_analyzer: ResultAnalyzer,
    ):
        self.config = config
        self.schema = schema
        self.baseline_endpoint = baseline_endpoint
        self.other_endpoints = other_endpoints
        self.result_analyzer = result_analyzer
        self.result = BenchmarkResult()

    def run_workloads(
        self,
    ) -> BenchmarkResult:

        for workload_cls in self.config.workload_classes:
            assert issubclass(
                workload_cls, Workload
            ), f"{workload_cls} is not a Workload"
            self.run_workload_for_all_endpoints(
                workload_cls,
            )

        return self.result

    def run_workload_for_all_endpoints(
        self,
        workload_cls: type[Workload],
    ):
        if self.baseline_endpoint is not None:
            baseline_result = self.run_workload_for_endpoint(
                self.baseline_endpoint,
                self.create_workload_instance(
                    workload_cls, endpoint=self.baseline_endpoint
                ),
            )
        else:
            baseline_result = None

        for other_endpoint in self.other_endpoints:
            comparison_outcome = self.run_and_evaluate_workload_for_endpoint(
                workload_cls, other_endpoint, baseline_result, try_count=0
            )

            self.result.add_regression(comparison_outcome)

    def run_and_evaluate_workload_for_endpoint(
        self,
        workload_cls: type[Workload],
        other_endpoint: Endpoint,
        baseline_result: WorkloadResult | None,
        try_count: int,
    ) -> ComparisonOutcome | None:
        workload_name = workload_cls.__name__
        other_endpoint_result = self.run_workload_for_endpoint(
            other_endpoint,
            self.create_workload_instance(workload_cls, endpoint=other_endpoint),
        )

        if self.baseline_endpoint is None or baseline_result is None:
            return None

        outcome = self.result_analyzer.perform_comparison_in_workload(
            workload_name,
            self.baseline_endpoint,
            other_endpoint,
            baseline_result,
            other_endpoint_result,
        )

        if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION:
            print(
                f"Potential regression in workload {workload_name} at endpoint {other_endpoint},"
                f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}"
            )
            return self.run_and_evaluate_workload_for_endpoint(
                workload_cls, other_endpoint, baseline_result, try_count=try_count + 1
            )

        return outcome

    def run_workload_for_endpoint(
        self,
        endpoint: Endpoint,
        workload: Workload,
    ) -> WorkloadResult:
        print(f"Running workload {workload.name()} on {endpoint}")

        df_totals = DfTotals()
        df_details = DfDetails()

        concurrencies = self._get_concurrencies()
        print(f"Concurrencies: {concurrencies}")

        for concurrency in concurrencies:
            df_total, df_detail = self.run_workload_for_endpoint_with_concurrency(
                endpoint,
                workload,
                concurrency,
                self.config.get_count_for_concurrency(concurrency),
            )
            df_totals = concat_df_totals([df_totals, df_total])
            df_details = concat_df_details([df_details, df_detail])

            endpoint_version_name = endpoint.try_load_version()
            pathlib.Path(paths.endpoint_dir(endpoint_version_name)).mkdir(
                parents=True, exist_ok=True
            )

            df_totals.to_csv(
                paths.df_totals_csv(endpoint_version_name, workload.name())
            )
            df_details.to_csv(
                paths.df_details_csv(endpoint_version_name, workload.name())
            )

        result = WorkloadResult(workload, endpoint, df_totals, df_details)
        self._record_results(result)
        return result

    def run_workload_for_endpoint_with_concurrency(
        self,
        endpoint: Endpoint,
        workload: Workload,
        concurrency: int,
        count: int,
    ) -> tuple[DfTotals, DfDetails]:
        print(
            f"Preparing benchmark for workload '{workload.name()}' at concurrency {concurrency} ..."
        )
        endpoint.up()

        init_sqls = self.schema.init_sqls()

        init_conn = endpoint.sql_connection()
        init_conn.autocommit = True
        init_cursor = init_conn.cursor()
        for init_sql in init_sqls:
            print(init_sql)
            init_cursor.execute(init_sql.encode("utf8"))

        for init_operation in workload.init_operations():
            workload.execute_operation(
                init_operation, init_cursor, -1, -1, self.config.verbose
            )

        print(
            f"Creating a cursor pool with {concurrency} entries against endpoint: {endpoint.url()}"
        )
        cursor_pool = self._create_cursor_pool(concurrency, endpoint)

        print(
            f"Benchmarking workload '{workload.name()}' at concurrency {concurrency} ..."
        )
        operations = workload.operations()

        global next_worker_id
        next_worker_id = 0
        local = threading.local()
        lock = threading.Lock()

        start = time.time()
        with futures.ThreadPoolExecutor(
            concurrency, initializer=self.initialize_worker, initargs=(local, lock)
        ) as executor:
            measurements = executor.map(
                self.execute_operation,
                [
                    (
                        workload,
                        concurrency,
                        local,
                        cursor_pool,
                        operations[i % len(operations)],
                        int(i / len(operations)),
                    )
                    for i in range(count)
                ],
            )
        wallclock_total = time.time() - start

        df_detail = pd.DataFrame(measurements)
        print("Best and worst individual measurements:")
        print(df_detail.sort_values(by=[df_details_cols.WALLCLOCK]))

        print(
            f"concurrency: {concurrency}; wallclock_total: {wallclock_total}; tps = {count/wallclock_total}"
        )

        df_total = pd.DataFrame(
            [
                {
                    df_totals_cols.CONCURRENCY: concurrency,
                    df_totals_cols.WALLCLOCK: wallclock_total,
                    df_totals_cols.WORKLOAD: workload.name(),
                    df_totals_cols.COUNT: count,
                    df_totals_cols.TPS: count / wallclock_total,
                    df_totals_cols.MEAN_TX_DURATION: df_detail[
                        df_details_cols.WALLCLOCK
                    ].mean(),
                    df_totals_cols.MEDIAN_TX_DURATION: df_detail[
                        df_details_cols.WALLCLOCK
                    ].median(),
                    df_totals_cols.MIN_TX_DURATION: df_detail[
                        df_details_cols.WALLCLOCK
                    ].min(),
                    df_totals_cols.MAX_TX_DURATION: df_detail[
                        df_details_cols.WALLCLOCK
                    ].max(),
                }
            ]
        )

        return DfTotals(df_total), DfDetails(df_detail)

    def execute_operation(
        self, args: tuple[Workload, int, threading.local, list[Cursor], Operation, int]
    ) -> dict[str, Any]:
        workload, concurrency, local, cursor_pool, operation, transaction_index = args
        worker_id = local.worker_id
        assert (
            len(cursor_pool) >= worker_id + 1
        ), f"len(cursor_pool) is {len(cursor_pool)} but local.worker_id is {worker_id}"
        cursor = cursor_pool[worker_id]

        start = time.time()
        workload.execute_operation(
            operation, cursor, worker_id, transaction_index, self.config.verbose
        )
        wallclock = time.time() - start

        return {
            df_details_cols.CONCURRENCY: concurrency,
            df_details_cols.WALLCLOCK: wallclock,
            df_details_cols.OPERATION: type(operation).__name__,
            df_details_cols.WORKLOAD: workload.name(),
            df_details_cols.TRANSACTION_INDEX: transaction_index,
        }

    def create_workload_instance(
        self, workload_cls: type[Workload], endpoint: Endpoint
    ) -> Workload:
        workload = workload_cls()

        if isinstance(workload, WorkloadWithContext):
            workload.set_endpoint(endpoint)
            workload.set_schema(self.schema)

        return workload

    def initialize_worker(self, local: threading.local, lock: threading.Lock):
        """Give each other worker thread a unique ID"""
        lock.acquire()
        global next_worker_id
        local.worker_id = next_worker_id
        next_worker_id = next_worker_id + 1
        lock.release()

    def _get_concurrencies(self) -> list[int]:
        range_end = 1024 if self.config.exponent_base < 2.0 else 32
        concurrencies: list[int] = [
            round(self.config.exponent_base**c) for c in range(0, range_end)
        ]
        concurrencies = sorted(set(concurrencies))
        return [
            c
            for c in concurrencies
            if self.config.min_concurrency <= c <= self.config.max_concurrency
        ]

    def _create_cursor_pool(self, concurrency: int, endpoint: Endpoint) -> list[Cursor]:
        connect_sqls = self.schema.connect_sqls()

        cursor_pool = []
        for i in range(concurrency):
            conn = endpoint.sql_connection()
            conn.autocommit = True
            cursor = conn.cursor()
            for connect_sql in connect_sqls:
                cursor.execute(connect_sql.encode("utf8"))
            cursor_pool.append(cursor)

        return cursor_pool

    def _record_results(self, result: WorkloadResult) -> None:
        endpoint_version_info = result.endpoint.try_load_version()
        print(
            f"Collecting results of endpoint {result.endpoint} with name {endpoint_version_info}"
        )

        self.result.append_workload_result(endpoint_version_info, result)

Methods

def create_workload_instance(self, workload_cls: type[Workload], endpoint: Endpoint) ‑> Workload
Expand source code Browse git
def create_workload_instance(
    self, workload_cls: type[Workload], endpoint: Endpoint
) -> Workload:
    workload = workload_cls()

    if isinstance(workload, WorkloadWithContext):
        workload.set_endpoint(endpoint)
        workload.set_schema(self.schema)

    return workload
def execute_operation(self, args: tuple[Workload, int, _thread._local, list[psycopg.Cursor], Operation, int]) ‑> dict[str, typing.Any]
Expand source code Browse git
def execute_operation(
    self, args: tuple[Workload, int, threading.local, list[Cursor], Operation, int]
) -> dict[str, Any]:
    workload, concurrency, local, cursor_pool, operation, transaction_index = args
    worker_id = local.worker_id
    assert (
        len(cursor_pool) >= worker_id + 1
    ), f"len(cursor_pool) is {len(cursor_pool)} but local.worker_id is {worker_id}"
    cursor = cursor_pool[worker_id]

    start = time.time()
    workload.execute_operation(
        operation, cursor, worker_id, transaction_index, self.config.verbose
    )
    wallclock = time.time() - start

    return {
        df_details_cols.CONCURRENCY: concurrency,
        df_details_cols.WALLCLOCK: wallclock,
        df_details_cols.OPERATION: type(operation).__name__,
        df_details_cols.WORKLOAD: workload.name(),
        df_details_cols.TRANSACTION_INDEX: transaction_index,
    }
def initialize_worker(self, local: _thread._local, lock: )

Give each other worker thread a unique ID

Expand source code Browse git
def initialize_worker(self, local: threading.local, lock: threading.Lock):
    """Give each other worker thread a unique ID"""
    lock.acquire()
    global next_worker_id
    local.worker_id = next_worker_id
    next_worker_id = next_worker_id + 1
    lock.release()
def run_and_evaluate_workload_for_endpoint(self, workload_cls: type[Workload], other_endpoint: Endpoint, baseline_result: WorkloadResult | None, try_count: int) ‑> ComparisonOutcome | None
Expand source code Browse git
def run_and_evaluate_workload_for_endpoint(
    self,
    workload_cls: type[Workload],
    other_endpoint: Endpoint,
    baseline_result: WorkloadResult | None,
    try_count: int,
) -> ComparisonOutcome | None:
    workload_name = workload_cls.__name__
    other_endpoint_result = self.run_workload_for_endpoint(
        other_endpoint,
        self.create_workload_instance(workload_cls, endpoint=other_endpoint),
    )

    if self.baseline_endpoint is None or baseline_result is None:
        return None

    outcome = self.result_analyzer.perform_comparison_in_workload(
        workload_name,
        self.baseline_endpoint,
        other_endpoint,
        baseline_result,
        other_endpoint_result,
    )

    if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION:
        print(
            f"Potential regression in workload {workload_name} at endpoint {other_endpoint},"
            f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}"
        )
        return self.run_and_evaluate_workload_for_endpoint(
            workload_cls, other_endpoint, baseline_result, try_count=try_count + 1
        )

    return outcome
def run_workload_for_all_endpoints(self, workload_cls: type[Workload])
Expand source code Browse git
def run_workload_for_all_endpoints(
    self,
    workload_cls: type[Workload],
):
    if self.baseline_endpoint is not None:
        baseline_result = self.run_workload_for_endpoint(
            self.baseline_endpoint,
            self.create_workload_instance(
                workload_cls, endpoint=self.baseline_endpoint
            ),
        )
    else:
        baseline_result = None

    for other_endpoint in self.other_endpoints:
        comparison_outcome = self.run_and_evaluate_workload_for_endpoint(
            workload_cls, other_endpoint, baseline_result, try_count=0
        )

        self.result.add_regression(comparison_outcome)
def run_workload_for_endpoint(self, endpoint: Endpoint, workload: Workload) ‑> WorkloadResult
Expand source code Browse git
def run_workload_for_endpoint(
    self,
    endpoint: Endpoint,
    workload: Workload,
) -> WorkloadResult:
    print(f"Running workload {workload.name()} on {endpoint}")

    df_totals = DfTotals()
    df_details = DfDetails()

    concurrencies = self._get_concurrencies()
    print(f"Concurrencies: {concurrencies}")

    for concurrency in concurrencies:
        df_total, df_detail = self.run_workload_for_endpoint_with_concurrency(
            endpoint,
            workload,
            concurrency,
            self.config.get_count_for_concurrency(concurrency),
        )
        df_totals = concat_df_totals([df_totals, df_total])
        df_details = concat_df_details([df_details, df_detail])

        endpoint_version_name = endpoint.try_load_version()
        pathlib.Path(paths.endpoint_dir(endpoint_version_name)).mkdir(
            parents=True, exist_ok=True
        )

        df_totals.to_csv(
            paths.df_totals_csv(endpoint_version_name, workload.name())
        )
        df_details.to_csv(
            paths.df_details_csv(endpoint_version_name, workload.name())
        )

    result = WorkloadResult(workload, endpoint, df_totals, df_details)
    self._record_results(result)
    return result
def run_workload_for_endpoint_with_concurrency(self, endpoint: Endpoint, workload: Workload, concurrency: int, count: int) ‑> tuple[DfTotalsDfDetails]
Expand source code Browse git
def run_workload_for_endpoint_with_concurrency(
    self,
    endpoint: Endpoint,
    workload: Workload,
    concurrency: int,
    count: int,
) -> tuple[DfTotals, DfDetails]:
    print(
        f"Preparing benchmark for workload '{workload.name()}' at concurrency {concurrency} ..."
    )
    endpoint.up()

    init_sqls = self.schema.init_sqls()

    init_conn = endpoint.sql_connection()
    init_conn.autocommit = True
    init_cursor = init_conn.cursor()
    for init_sql in init_sqls:
        print(init_sql)
        init_cursor.execute(init_sql.encode("utf8"))

    for init_operation in workload.init_operations():
        workload.execute_operation(
            init_operation, init_cursor, -1, -1, self.config.verbose
        )

    print(
        f"Creating a cursor pool with {concurrency} entries against endpoint: {endpoint.url()}"
    )
    cursor_pool = self._create_cursor_pool(concurrency, endpoint)

    print(
        f"Benchmarking workload '{workload.name()}' at concurrency {concurrency} ..."
    )
    operations = workload.operations()

    global next_worker_id
    next_worker_id = 0
    local = threading.local()
    lock = threading.Lock()

    start = time.time()
    with futures.ThreadPoolExecutor(
        concurrency, initializer=self.initialize_worker, initargs=(local, lock)
    ) as executor:
        measurements = executor.map(
            self.execute_operation,
            [
                (
                    workload,
                    concurrency,
                    local,
                    cursor_pool,
                    operations[i % len(operations)],
                    int(i / len(operations)),
                )
                for i in range(count)
            ],
        )
    wallclock_total = time.time() - start

    df_detail = pd.DataFrame(measurements)
    print("Best and worst individual measurements:")
    print(df_detail.sort_values(by=[df_details_cols.WALLCLOCK]))

    print(
        f"concurrency: {concurrency}; wallclock_total: {wallclock_total}; tps = {count/wallclock_total}"
    )

    df_total = pd.DataFrame(
        [
            {
                df_totals_cols.CONCURRENCY: concurrency,
                df_totals_cols.WALLCLOCK: wallclock_total,
                df_totals_cols.WORKLOAD: workload.name(),
                df_totals_cols.COUNT: count,
                df_totals_cols.TPS: count / wallclock_total,
                df_totals_cols.MEAN_TX_DURATION: df_detail[
                    df_details_cols.WALLCLOCK
                ].mean(),
                df_totals_cols.MEDIAN_TX_DURATION: df_detail[
                    df_details_cols.WALLCLOCK
                ].median(),
                df_totals_cols.MIN_TX_DURATION: df_detail[
                    df_details_cols.WALLCLOCK
                ].min(),
                df_totals_cols.MAX_TX_DURATION: df_detail[
                    df_details_cols.WALLCLOCK
                ].max(),
            }
        ]
    )

    return DfTotals(df_total), DfDetails(df_detail)
def run_workloads(self) ‑> BenchmarkResult
Expand source code Browse git
def run_workloads(
    self,
) -> BenchmarkResult:

    for workload_cls in self.config.workload_classes:
        assert issubclass(
            workload_cls, Workload
        ), f"{workload_cls} is not a Workload"
        self.run_workload_for_all_endpoints(
            workload_cls,
        )

    return self.result