Module materialize.scalability.benchmark_executor
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import pathlib
import threading
import time
from concurrent import futures
from typing import Any
import pandas as pd
from psycopg import Cursor
from materialize.scalability.benchmark_config import BenchmarkConfiguration
from materialize.scalability.benchmark_result import BenchmarkResult
from materialize.scalability.comparison_outcome import ComparisonOutcome
from materialize.scalability.df import df_details_cols, df_totals_cols
from materialize.scalability.df.df_details import DfDetails, concat_df_details
from materialize.scalability.df.df_totals import DfTotals, concat_df_totals
from materialize.scalability.endpoint import Endpoint
from materialize.scalability.io import paths
from materialize.scalability.operation import Operation
from materialize.scalability.result_analyzer import ResultAnalyzer
from materialize.scalability.schema import Schema
from materialize.scalability.workload import Workload, WorkloadWithContext
from materialize.scalability.workload_result import WorkloadResult
from materialize.scalability.workloads import * # noqa: F401 F403
from materialize.scalability.workloads_test import * # noqa: F401 F403
# number of retries in addition to the first run
MAX_RETRIES_ON_REGRESSION = 2
class BenchmarkExecutor:
def __init__(
self,
config: BenchmarkConfiguration,
schema: Schema,
baseline_endpoint: Endpoint | None,
other_endpoints: list[Endpoint],
result_analyzer: ResultAnalyzer,
):
self.config = config
self.schema = schema
self.baseline_endpoint = baseline_endpoint
self.other_endpoints = other_endpoints
self.result_analyzer = result_analyzer
self.result = BenchmarkResult()
def run_workloads(
self,
) -> BenchmarkResult:
for workload_cls in self.config.workload_classes:
assert issubclass(
workload_cls, Workload
), f"{workload_cls} is not a Workload"
self.run_workload_for_all_endpoints(
workload_cls,
)
return self.result
def run_workload_for_all_endpoints(
self,
workload_cls: type[Workload],
):
if self.baseline_endpoint is not None:
baseline_result = self.run_workload_for_endpoint(
self.baseline_endpoint,
self.create_workload_instance(
workload_cls, endpoint=self.baseline_endpoint
),
)
else:
baseline_result = None
for other_endpoint in self.other_endpoints:
comparison_outcome = self.run_and_evaluate_workload_for_endpoint(
workload_cls, other_endpoint, baseline_result, try_count=0
)
self.result.add_regression(comparison_outcome)
def run_and_evaluate_workload_for_endpoint(
self,
workload_cls: type[Workload],
other_endpoint: Endpoint,
baseline_result: WorkloadResult | None,
try_count: int,
) -> ComparisonOutcome | None:
workload_name = workload_cls.__name__
other_endpoint_result = self.run_workload_for_endpoint(
other_endpoint,
self.create_workload_instance(workload_cls, endpoint=other_endpoint),
)
if self.baseline_endpoint is None or baseline_result is None:
return None
outcome = self.result_analyzer.perform_comparison_in_workload(
workload_name,
self.baseline_endpoint,
other_endpoint,
baseline_result,
other_endpoint_result,
)
if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION:
print(
f"Potential regression in workload {workload_name} at endpoint {other_endpoint},"
f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}"
)
return self.run_and_evaluate_workload_for_endpoint(
workload_cls, other_endpoint, baseline_result, try_count=try_count + 1
)
return outcome
def run_workload_for_endpoint(
self,
endpoint: Endpoint,
workload: Workload,
) -> WorkloadResult:
print(f"Running workload {workload.name()} on {endpoint}")
df_totals = DfTotals()
df_details = DfDetails()
concurrencies = self._get_concurrencies()
print(f"Concurrencies: {concurrencies}")
for concurrency in concurrencies:
df_total, df_detail = self.run_workload_for_endpoint_with_concurrency(
endpoint,
workload,
concurrency,
self.config.get_count_for_concurrency(concurrency),
)
df_totals = concat_df_totals([df_totals, df_total])
df_details = concat_df_details([df_details, df_detail])
endpoint_version_name = endpoint.try_load_version()
pathlib.Path(paths.endpoint_dir(endpoint_version_name)).mkdir(
parents=True, exist_ok=True
)
df_totals.to_csv(
paths.df_totals_csv(endpoint_version_name, workload.name())
)
df_details.to_csv(
paths.df_details_csv(endpoint_version_name, workload.name())
)
result = WorkloadResult(workload, endpoint, df_totals, df_details)
self._record_results(result)
return result
def run_workload_for_endpoint_with_concurrency(
self,
endpoint: Endpoint,
workload: Workload,
concurrency: int,
count: int,
) -> tuple[DfTotals, DfDetails]:
print(
f"Preparing benchmark for workload '{workload.name()}' at concurrency {concurrency} ..."
)
endpoint.up()
init_sqls = self.schema.init_sqls()
init_conn = endpoint.sql_connection()
init_conn.autocommit = True
init_cursor = init_conn.cursor()
for init_sql in init_sqls:
print(init_sql)
init_cursor.execute(init_sql.encode("utf8"))
for init_operation in workload.init_operations():
workload.execute_operation(
init_operation, init_cursor, -1, -1, self.config.verbose
)
print(
f"Creating a cursor pool with {concurrency} entries against endpoint: {endpoint.url()}"
)
cursor_pool = self._create_cursor_pool(concurrency, endpoint)
print(
f"Benchmarking workload '{workload.name()}' at concurrency {concurrency} ..."
)
operations = workload.operations()
global next_worker_id
next_worker_id = 0
local = threading.local()
lock = threading.Lock()
start = time.time()
with futures.ThreadPoolExecutor(
concurrency, initializer=self.initialize_worker, initargs=(local, lock)
) as executor:
measurements = executor.map(
self.execute_operation,
[
(
workload,
concurrency,
local,
cursor_pool,
operations[i % len(operations)],
int(i / len(operations)),
)
for i in range(count)
],
)
wallclock_total = time.time() - start
df_detail = pd.DataFrame(measurements)
print("Best and worst individual measurements:")
print(df_detail.sort_values(by=[df_details_cols.WALLCLOCK]))
print(
f"concurrency: {concurrency}; wallclock_total: {wallclock_total}; tps = {count/wallclock_total}"
)
df_total = pd.DataFrame(
[
{
df_totals_cols.CONCURRENCY: concurrency,
df_totals_cols.WALLCLOCK: wallclock_total,
df_totals_cols.WORKLOAD: workload.name(),
df_totals_cols.COUNT: count,
df_totals_cols.TPS: count / wallclock_total,
df_totals_cols.MEAN_TX_DURATION: df_detail[
df_details_cols.WALLCLOCK
].mean(),
df_totals_cols.MEDIAN_TX_DURATION: df_detail[
df_details_cols.WALLCLOCK
].median(),
df_totals_cols.MIN_TX_DURATION: df_detail[
df_details_cols.WALLCLOCK
].min(),
df_totals_cols.MAX_TX_DURATION: df_detail[
df_details_cols.WALLCLOCK
].max(),
}
]
)
return DfTotals(df_total), DfDetails(df_detail)
def execute_operation(
self, args: tuple[Workload, int, threading.local, list[Cursor], Operation, int]
) -> dict[str, Any]:
workload, concurrency, local, cursor_pool, operation, transaction_index = args
worker_id = local.worker_id
assert (
len(cursor_pool) >= worker_id + 1
), f"len(cursor_pool) is {len(cursor_pool)} but local.worker_id is {worker_id}"
cursor = cursor_pool[worker_id]
start = time.time()
workload.execute_operation(
operation, cursor, worker_id, transaction_index, self.config.verbose
)
wallclock = time.time() - start
return {
df_details_cols.CONCURRENCY: concurrency,
df_details_cols.WALLCLOCK: wallclock,
df_details_cols.OPERATION: type(operation).__name__,
df_details_cols.WORKLOAD: workload.name(),
df_details_cols.TRANSACTION_INDEX: transaction_index,
}
def create_workload_instance(
self, workload_cls: type[Workload], endpoint: Endpoint
) -> Workload:
workload = workload_cls()
if isinstance(workload, WorkloadWithContext):
workload.set_endpoint(endpoint)
workload.set_schema(self.schema)
return workload
def initialize_worker(self, local: threading.local, lock: threading.Lock):
"""Give each other worker thread a unique ID"""
lock.acquire()
global next_worker_id
local.worker_id = next_worker_id
next_worker_id = next_worker_id + 1
lock.release()
def _get_concurrencies(self) -> list[int]:
range_end = 1024 if self.config.exponent_base < 2.0 else 32
concurrencies: list[int] = [
round(self.config.exponent_base**c) for c in range(0, range_end)
]
concurrencies = sorted(set(concurrencies))
return [
c
for c in concurrencies
if self.config.min_concurrency <= c <= self.config.max_concurrency
]
def _create_cursor_pool(self, concurrency: int, endpoint: Endpoint) -> list[Cursor]:
connect_sqls = self.schema.connect_sqls()
cursor_pool = []
for i in range(concurrency):
conn = endpoint.sql_connection()
conn.autocommit = True
cursor = conn.cursor()
for connect_sql in connect_sqls:
cursor.execute(connect_sql.encode("utf8"))
cursor_pool.append(cursor)
return cursor_pool
def _record_results(self, result: WorkloadResult) -> None:
endpoint_version_info = result.endpoint.try_load_version()
print(
f"Collecting results of endpoint {result.endpoint} with name {endpoint_version_info}"
)
self.result.append_workload_result(endpoint_version_info, result)
Classes
class BenchmarkExecutor (config: BenchmarkConfiguration, schema: Schema, baseline_endpoint: Endpoint | None, other_endpoints: list[Endpoint], result_analyzer: ResultAnalyzer)
-
Expand source code Browse git
class BenchmarkExecutor: def __init__( self, config: BenchmarkConfiguration, schema: Schema, baseline_endpoint: Endpoint | None, other_endpoints: list[Endpoint], result_analyzer: ResultAnalyzer, ): self.config = config self.schema = schema self.baseline_endpoint = baseline_endpoint self.other_endpoints = other_endpoints self.result_analyzer = result_analyzer self.result = BenchmarkResult() def run_workloads( self, ) -> BenchmarkResult: for workload_cls in self.config.workload_classes: assert issubclass( workload_cls, Workload ), f"{workload_cls} is not a Workload" self.run_workload_for_all_endpoints( workload_cls, ) return self.result def run_workload_for_all_endpoints( self, workload_cls: type[Workload], ): if self.baseline_endpoint is not None: baseline_result = self.run_workload_for_endpoint( self.baseline_endpoint, self.create_workload_instance( workload_cls, endpoint=self.baseline_endpoint ), ) else: baseline_result = None for other_endpoint in self.other_endpoints: comparison_outcome = self.run_and_evaluate_workload_for_endpoint( workload_cls, other_endpoint, baseline_result, try_count=0 ) self.result.add_regression(comparison_outcome) def run_and_evaluate_workload_for_endpoint( self, workload_cls: type[Workload], other_endpoint: Endpoint, baseline_result: WorkloadResult | None, try_count: int, ) -> ComparisonOutcome | None: workload_name = workload_cls.__name__ other_endpoint_result = self.run_workload_for_endpoint( other_endpoint, self.create_workload_instance(workload_cls, endpoint=other_endpoint), ) if self.baseline_endpoint is None or baseline_result is None: return None outcome = self.result_analyzer.perform_comparison_in_workload( workload_name, self.baseline_endpoint, other_endpoint, baseline_result, other_endpoint_result, ) if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION: print( f"Potential regression in workload {workload_name} at endpoint {other_endpoint}," f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}" ) return self.run_and_evaluate_workload_for_endpoint( workload_cls, other_endpoint, baseline_result, try_count=try_count + 1 ) return outcome def run_workload_for_endpoint( self, endpoint: Endpoint, workload: Workload, ) -> WorkloadResult: print(f"Running workload {workload.name()} on {endpoint}") df_totals = DfTotals() df_details = DfDetails() concurrencies = self._get_concurrencies() print(f"Concurrencies: {concurrencies}") for concurrency in concurrencies: df_total, df_detail = self.run_workload_for_endpoint_with_concurrency( endpoint, workload, concurrency, self.config.get_count_for_concurrency(concurrency), ) df_totals = concat_df_totals([df_totals, df_total]) df_details = concat_df_details([df_details, df_detail]) endpoint_version_name = endpoint.try_load_version() pathlib.Path(paths.endpoint_dir(endpoint_version_name)).mkdir( parents=True, exist_ok=True ) df_totals.to_csv( paths.df_totals_csv(endpoint_version_name, workload.name()) ) df_details.to_csv( paths.df_details_csv(endpoint_version_name, workload.name()) ) result = WorkloadResult(workload, endpoint, df_totals, df_details) self._record_results(result) return result def run_workload_for_endpoint_with_concurrency( self, endpoint: Endpoint, workload: Workload, concurrency: int, count: int, ) -> tuple[DfTotals, DfDetails]: print( f"Preparing benchmark for workload '{workload.name()}' at concurrency {concurrency} ..." ) endpoint.up() init_sqls = self.schema.init_sqls() init_conn = endpoint.sql_connection() init_conn.autocommit = True init_cursor = init_conn.cursor() for init_sql in init_sqls: print(init_sql) init_cursor.execute(init_sql.encode("utf8")) for init_operation in workload.init_operations(): workload.execute_operation( init_operation, init_cursor, -1, -1, self.config.verbose ) print( f"Creating a cursor pool with {concurrency} entries against endpoint: {endpoint.url()}" ) cursor_pool = self._create_cursor_pool(concurrency, endpoint) print( f"Benchmarking workload '{workload.name()}' at concurrency {concurrency} ..." ) operations = workload.operations() global next_worker_id next_worker_id = 0 local = threading.local() lock = threading.Lock() start = time.time() with futures.ThreadPoolExecutor( concurrency, initializer=self.initialize_worker, initargs=(local, lock) ) as executor: measurements = executor.map( self.execute_operation, [ ( workload, concurrency, local, cursor_pool, operations[i % len(operations)], int(i / len(operations)), ) for i in range(count) ], ) wallclock_total = time.time() - start df_detail = pd.DataFrame(measurements) print("Best and worst individual measurements:") print(df_detail.sort_values(by=[df_details_cols.WALLCLOCK])) print( f"concurrency: {concurrency}; wallclock_total: {wallclock_total}; tps = {count/wallclock_total}" ) df_total = pd.DataFrame( [ { df_totals_cols.CONCURRENCY: concurrency, df_totals_cols.WALLCLOCK: wallclock_total, df_totals_cols.WORKLOAD: workload.name(), df_totals_cols.COUNT: count, df_totals_cols.TPS: count / wallclock_total, df_totals_cols.MEAN_TX_DURATION: df_detail[ df_details_cols.WALLCLOCK ].mean(), df_totals_cols.MEDIAN_TX_DURATION: df_detail[ df_details_cols.WALLCLOCK ].median(), df_totals_cols.MIN_TX_DURATION: df_detail[ df_details_cols.WALLCLOCK ].min(), df_totals_cols.MAX_TX_DURATION: df_detail[ df_details_cols.WALLCLOCK ].max(), } ] ) return DfTotals(df_total), DfDetails(df_detail) def execute_operation( self, args: tuple[Workload, int, threading.local, list[Cursor], Operation, int] ) -> dict[str, Any]: workload, concurrency, local, cursor_pool, operation, transaction_index = args worker_id = local.worker_id assert ( len(cursor_pool) >= worker_id + 1 ), f"len(cursor_pool) is {len(cursor_pool)} but local.worker_id is {worker_id}" cursor = cursor_pool[worker_id] start = time.time() workload.execute_operation( operation, cursor, worker_id, transaction_index, self.config.verbose ) wallclock = time.time() - start return { df_details_cols.CONCURRENCY: concurrency, df_details_cols.WALLCLOCK: wallclock, df_details_cols.OPERATION: type(operation).__name__, df_details_cols.WORKLOAD: workload.name(), df_details_cols.TRANSACTION_INDEX: transaction_index, } def create_workload_instance( self, workload_cls: type[Workload], endpoint: Endpoint ) -> Workload: workload = workload_cls() if isinstance(workload, WorkloadWithContext): workload.set_endpoint(endpoint) workload.set_schema(self.schema) return workload def initialize_worker(self, local: threading.local, lock: threading.Lock): """Give each other worker thread a unique ID""" lock.acquire() global next_worker_id local.worker_id = next_worker_id next_worker_id = next_worker_id + 1 lock.release() def _get_concurrencies(self) -> list[int]: range_end = 1024 if self.config.exponent_base < 2.0 else 32 concurrencies: list[int] = [ round(self.config.exponent_base**c) for c in range(0, range_end) ] concurrencies = sorted(set(concurrencies)) return [ c for c in concurrencies if self.config.min_concurrency <= c <= self.config.max_concurrency ] def _create_cursor_pool(self, concurrency: int, endpoint: Endpoint) -> list[Cursor]: connect_sqls = self.schema.connect_sqls() cursor_pool = [] for i in range(concurrency): conn = endpoint.sql_connection() conn.autocommit = True cursor = conn.cursor() for connect_sql in connect_sqls: cursor.execute(connect_sql.encode("utf8")) cursor_pool.append(cursor) return cursor_pool def _record_results(self, result: WorkloadResult) -> None: endpoint_version_info = result.endpoint.try_load_version() print( f"Collecting results of endpoint {result.endpoint} with name {endpoint_version_info}" ) self.result.append_workload_result(endpoint_version_info, result)
Methods
def create_workload_instance(self, workload_cls: type[Workload], endpoint: Endpoint) ‑> Workload
-
Expand source code Browse git
def create_workload_instance( self, workload_cls: type[Workload], endpoint: Endpoint ) -> Workload: workload = workload_cls() if isinstance(workload, WorkloadWithContext): workload.set_endpoint(endpoint) workload.set_schema(self.schema) return workload
def execute_operation(self, args: tuple[Workload, int, _thread._local, list[psycopg.Cursor], Operation, int]) ‑> dict[str, typing.Any]
-
Expand source code Browse git
def execute_operation( self, args: tuple[Workload, int, threading.local, list[Cursor], Operation, int] ) -> dict[str, Any]: workload, concurrency, local, cursor_pool, operation, transaction_index = args worker_id = local.worker_id assert ( len(cursor_pool) >= worker_id + 1 ), f"len(cursor_pool) is {len(cursor_pool)} but local.worker_id is {worker_id}" cursor = cursor_pool[worker_id] start = time.time() workload.execute_operation( operation, cursor, worker_id, transaction_index, self.config.verbose ) wallclock = time.time() - start return { df_details_cols.CONCURRENCY: concurrency, df_details_cols.WALLCLOCK: wallclock, df_details_cols.OPERATION: type(operation).__name__, df_details_cols.WORKLOAD: workload.name(), df_details_cols.TRANSACTION_INDEX: transaction_index, }
def initialize_worker(self, local: _thread._local, lock:
) -
Give each other worker thread a unique ID
Expand source code Browse git
def initialize_worker(self, local: threading.local, lock: threading.Lock): """Give each other worker thread a unique ID""" lock.acquire() global next_worker_id local.worker_id = next_worker_id next_worker_id = next_worker_id + 1 lock.release()
def run_and_evaluate_workload_for_endpoint(self, workload_cls: type[Workload], other_endpoint: Endpoint, baseline_result: WorkloadResult | None, try_count: int) ‑> ComparisonOutcome | None
-
Expand source code Browse git
def run_and_evaluate_workload_for_endpoint( self, workload_cls: type[Workload], other_endpoint: Endpoint, baseline_result: WorkloadResult | None, try_count: int, ) -> ComparisonOutcome | None: workload_name = workload_cls.__name__ other_endpoint_result = self.run_workload_for_endpoint( other_endpoint, self.create_workload_instance(workload_cls, endpoint=other_endpoint), ) if self.baseline_endpoint is None or baseline_result is None: return None outcome = self.result_analyzer.perform_comparison_in_workload( workload_name, self.baseline_endpoint, other_endpoint, baseline_result, other_endpoint_result, ) if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION: print( f"Potential regression in workload {workload_name} at endpoint {other_endpoint}," f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}" ) return self.run_and_evaluate_workload_for_endpoint( workload_cls, other_endpoint, baseline_result, try_count=try_count + 1 ) return outcome
def run_workload_for_all_endpoints(self, workload_cls: type[Workload])
-
Expand source code Browse git
def run_workload_for_all_endpoints( self, workload_cls: type[Workload], ): if self.baseline_endpoint is not None: baseline_result = self.run_workload_for_endpoint( self.baseline_endpoint, self.create_workload_instance( workload_cls, endpoint=self.baseline_endpoint ), ) else: baseline_result = None for other_endpoint in self.other_endpoints: comparison_outcome = self.run_and_evaluate_workload_for_endpoint( workload_cls, other_endpoint, baseline_result, try_count=0 ) self.result.add_regression(comparison_outcome)
def run_workload_for_endpoint(self, endpoint: Endpoint, workload: Workload) ‑> WorkloadResult
-
Expand source code Browse git
def run_workload_for_endpoint( self, endpoint: Endpoint, workload: Workload, ) -> WorkloadResult: print(f"Running workload {workload.name()} on {endpoint}") df_totals = DfTotals() df_details = DfDetails() concurrencies = self._get_concurrencies() print(f"Concurrencies: {concurrencies}") for concurrency in concurrencies: df_total, df_detail = self.run_workload_for_endpoint_with_concurrency( endpoint, workload, concurrency, self.config.get_count_for_concurrency(concurrency), ) df_totals = concat_df_totals([df_totals, df_total]) df_details = concat_df_details([df_details, df_detail]) endpoint_version_name = endpoint.try_load_version() pathlib.Path(paths.endpoint_dir(endpoint_version_name)).mkdir( parents=True, exist_ok=True ) df_totals.to_csv( paths.df_totals_csv(endpoint_version_name, workload.name()) ) df_details.to_csv( paths.df_details_csv(endpoint_version_name, workload.name()) ) result = WorkloadResult(workload, endpoint, df_totals, df_details) self._record_results(result) return result
def run_workload_for_endpoint_with_concurrency(self, endpoint: Endpoint, workload: Workload, concurrency: int, count: int) ‑> tuple[DfTotals, DfDetails]
-
Expand source code Browse git
def run_workload_for_endpoint_with_concurrency( self, endpoint: Endpoint, workload: Workload, concurrency: int, count: int, ) -> tuple[DfTotals, DfDetails]: print( f"Preparing benchmark for workload '{workload.name()}' at concurrency {concurrency} ..." ) endpoint.up() init_sqls = self.schema.init_sqls() init_conn = endpoint.sql_connection() init_conn.autocommit = True init_cursor = init_conn.cursor() for init_sql in init_sqls: print(init_sql) init_cursor.execute(init_sql.encode("utf8")) for init_operation in workload.init_operations(): workload.execute_operation( init_operation, init_cursor, -1, -1, self.config.verbose ) print( f"Creating a cursor pool with {concurrency} entries against endpoint: {endpoint.url()}" ) cursor_pool = self._create_cursor_pool(concurrency, endpoint) print( f"Benchmarking workload '{workload.name()}' at concurrency {concurrency} ..." ) operations = workload.operations() global next_worker_id next_worker_id = 0 local = threading.local() lock = threading.Lock() start = time.time() with futures.ThreadPoolExecutor( concurrency, initializer=self.initialize_worker, initargs=(local, lock) ) as executor: measurements = executor.map( self.execute_operation, [ ( workload, concurrency, local, cursor_pool, operations[i % len(operations)], int(i / len(operations)), ) for i in range(count) ], ) wallclock_total = time.time() - start df_detail = pd.DataFrame(measurements) print("Best and worst individual measurements:") print(df_detail.sort_values(by=[df_details_cols.WALLCLOCK])) print( f"concurrency: {concurrency}; wallclock_total: {wallclock_total}; tps = {count/wallclock_total}" ) df_total = pd.DataFrame( [ { df_totals_cols.CONCURRENCY: concurrency, df_totals_cols.WALLCLOCK: wallclock_total, df_totals_cols.WORKLOAD: workload.name(), df_totals_cols.COUNT: count, df_totals_cols.TPS: count / wallclock_total, df_totals_cols.MEAN_TX_DURATION: df_detail[ df_details_cols.WALLCLOCK ].mean(), df_totals_cols.MEDIAN_TX_DURATION: df_detail[ df_details_cols.WALLCLOCK ].median(), df_totals_cols.MIN_TX_DURATION: df_detail[ df_details_cols.WALLCLOCK ].min(), df_totals_cols.MAX_TX_DURATION: df_detail[ df_details_cols.WALLCLOCK ].max(), } ] ) return DfTotals(df_total), DfDetails(df_detail)
def run_workloads(self) ‑> BenchmarkResult
-
Expand source code Browse git
def run_workloads( self, ) -> BenchmarkResult: for workload_cls in self.config.workload_classes: assert issubclass( workload_cls, Workload ), f"{workload_cls} is not a Workload" self.run_workload_for_all_endpoints( workload_cls, ) return self.result