Module materialize.scalability.benchmark_result
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from dataclasses import dataclass
from typing import TypeVar
from materialize.scalability.df.df_details import DfDetails
from materialize.scalability.df.df_totals import DfTotals, concat_df_totals
from materialize.scalability.regression_outcome import RegressionOutcome
from materialize.scalability.workload_result import WorkloadResult
T = TypeVar("T")
@dataclass
class BenchmarkResult:
overall_regression_outcome: RegressionOutcome
df_total_by_endpoint_name_and_workload: dict[str, dict[str, DfTotals]]
df_details_by_endpoint_name_and_workload: dict[str, dict[str, DfDetails]]
def __init__(self):
self.overall_regression_outcome = RegressionOutcome()
self.df_total_by_endpoint_name_and_workload = dict()
self.df_details_by_endpoint_name_and_workload = dict()
def add_regression(self, regression_outcome: RegressionOutcome | None) -> None:
if regression_outcome is not None:
self.overall_regression_outcome.merge(regression_outcome)
def get_endpoint_names(self) -> list[str]:
return list(self.df_total_by_endpoint_name_and_workload.keys())
def append_workload_result(
self, endpoint_version_info: str, result: WorkloadResult
) -> None:
if (
endpoint_version_info
not in self.df_total_by_endpoint_name_and_workload.keys()
):
self.df_total_by_endpoint_name_and_workload[endpoint_version_info] = dict()
self.df_details_by_endpoint_name_and_workload[
endpoint_version_info
] = dict()
workload_name = result.workload.name()
if (
workload_name
in self.df_total_by_endpoint_name_and_workload[endpoint_version_info].keys()
):
# Entry already exists, this happens in case of retries
print(
f"Replacing result entry for endpoint ({endpoint_version_info}) and workload {workload_name}"
)
self.df_total_by_endpoint_name_and_workload[endpoint_version_info][
workload_name
] = result.df_totals
self.df_details_by_endpoint_name_and_workload[endpoint_version_info][
workload_name
] = result.df_details
def get_df_total_by_endpoint_name(self, endpoint_name: str) -> DfTotals:
return concat_df_totals(
list(self.df_total_by_endpoint_name_and_workload[endpoint_name].values())
)
def get_df_total_by_workload_and_endpoint(
self,
) -> dict[str, dict[str, DfTotals]]:
return self._swap_endpoint_and_workload_grouping(
self.df_total_by_endpoint_name_and_workload
)
def get_df_details_by_workload_and_endpoint(
self,
) -> dict[str, dict[str, DfDetails]]:
return self._swap_endpoint_and_workload_grouping(
self.df_details_by_endpoint_name_and_workload
)
def _swap_endpoint_and_workload_grouping(
self, result_by_endpoint_and_workload: dict[str, dict[str, T]]
) -> dict[str, dict[str, T]]:
result: dict[str, dict[str, T]] = dict()
for (
endpoint_name,
data_by_workload,
) in result_by_endpoint_and_workload.items():
for workload_name, data in data_by_workload.items():
if workload_name not in result.keys():
result[workload_name] = dict()
result[workload_name][endpoint_name] = data
return result
Classes
class BenchmarkResult
-
BenchmarkResult()
Expand source code Browse git
@dataclass class BenchmarkResult: overall_regression_outcome: RegressionOutcome df_total_by_endpoint_name_and_workload: dict[str, dict[str, DfTotals]] df_details_by_endpoint_name_and_workload: dict[str, dict[str, DfDetails]] def __init__(self): self.overall_regression_outcome = RegressionOutcome() self.df_total_by_endpoint_name_and_workload = dict() self.df_details_by_endpoint_name_and_workload = dict() def add_regression(self, regression_outcome: RegressionOutcome | None) -> None: if regression_outcome is not None: self.overall_regression_outcome.merge(regression_outcome) def get_endpoint_names(self) -> list[str]: return list(self.df_total_by_endpoint_name_and_workload.keys()) def append_workload_result( self, endpoint_version_info: str, result: WorkloadResult ) -> None: if ( endpoint_version_info not in self.df_total_by_endpoint_name_and_workload.keys() ): self.df_total_by_endpoint_name_and_workload[endpoint_version_info] = dict() self.df_details_by_endpoint_name_and_workload[ endpoint_version_info ] = dict() workload_name = result.workload.name() if ( workload_name in self.df_total_by_endpoint_name_and_workload[endpoint_version_info].keys() ): # Entry already exists, this happens in case of retries print( f"Replacing result entry for endpoint ({endpoint_version_info}) and workload {workload_name}" ) self.df_total_by_endpoint_name_and_workload[endpoint_version_info][ workload_name ] = result.df_totals self.df_details_by_endpoint_name_and_workload[endpoint_version_info][ workload_name ] = result.df_details def get_df_total_by_endpoint_name(self, endpoint_name: str) -> DfTotals: return concat_df_totals( list(self.df_total_by_endpoint_name_and_workload[endpoint_name].values()) ) def get_df_total_by_workload_and_endpoint( self, ) -> dict[str, dict[str, DfTotals]]: return self._swap_endpoint_and_workload_grouping( self.df_total_by_endpoint_name_and_workload ) def get_df_details_by_workload_and_endpoint( self, ) -> dict[str, dict[str, DfDetails]]: return self._swap_endpoint_and_workload_grouping( self.df_details_by_endpoint_name_and_workload ) def _swap_endpoint_and_workload_grouping( self, result_by_endpoint_and_workload: dict[str, dict[str, T]] ) -> dict[str, dict[str, T]]: result: dict[str, dict[str, T]] = dict() for ( endpoint_name, data_by_workload, ) in result_by_endpoint_and_workload.items(): for workload_name, data in data_by_workload.items(): if workload_name not in result.keys(): result[workload_name] = dict() result[workload_name][endpoint_name] = data return result
Class variables
var df_details_by_endpoint_name_and_workload : dict[str, dict[str, DfDetails]]
var df_total_by_endpoint_name_and_workload : dict[str, dict[str, DfTotals]]
var overall_regression_outcome : RegressionOutcome
Methods
def add_regression(self, regression_outcome: RegressionOutcome | None) ‑> None
-
Expand source code Browse git
def add_regression(self, regression_outcome: RegressionOutcome | None) -> None: if regression_outcome is not None: self.overall_regression_outcome.merge(regression_outcome)
def append_workload_result(self, endpoint_version_info: str, result: WorkloadResult) ‑> None
-
Expand source code Browse git
def append_workload_result( self, endpoint_version_info: str, result: WorkloadResult ) -> None: if ( endpoint_version_info not in self.df_total_by_endpoint_name_and_workload.keys() ): self.df_total_by_endpoint_name_and_workload[endpoint_version_info] = dict() self.df_details_by_endpoint_name_and_workload[ endpoint_version_info ] = dict() workload_name = result.workload.name() if ( workload_name in self.df_total_by_endpoint_name_and_workload[endpoint_version_info].keys() ): # Entry already exists, this happens in case of retries print( f"Replacing result entry for endpoint ({endpoint_version_info}) and workload {workload_name}" ) self.df_total_by_endpoint_name_and_workload[endpoint_version_info][ workload_name ] = result.df_totals self.df_details_by_endpoint_name_and_workload[endpoint_version_info][ workload_name ] = result.df_details
def get_df_details_by_workload_and_endpoint(self) ‑> dict[str, dict[str, DfDetails]]
-
Expand source code Browse git
def get_df_details_by_workload_and_endpoint( self, ) -> dict[str, dict[str, DfDetails]]: return self._swap_endpoint_and_workload_grouping( self.df_details_by_endpoint_name_and_workload )
def get_df_total_by_endpoint_name(self, endpoint_name: str) ‑> DfTotals
-
Expand source code Browse git
def get_df_total_by_endpoint_name(self, endpoint_name: str) -> DfTotals: return concat_df_totals( list(self.df_total_by_endpoint_name_and_workload[endpoint_name].values()) )
def get_df_total_by_workload_and_endpoint(self) ‑> dict[str, dict[str, DfTotals]]
-
Expand source code Browse git
def get_df_total_by_workload_and_endpoint( self, ) -> dict[str, dict[str, DfTotals]]: return self._swap_endpoint_and_workload_grouping( self.df_total_by_endpoint_name_and_workload )
def get_endpoint_names(self) ‑> list[str]
-
Expand source code Browse git
def get_endpoint_names(self) -> list[str]: return list(self.df_total_by_endpoint_name_and_workload.keys())