Module materialize.scalability.benchmark_result

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from dataclasses import dataclass
from typing import TypeVar

from materialize.scalability.df.df_details import DfDetails
from materialize.scalability.df.df_totals import DfTotals, concat_df_totals
from materialize.scalability.regression_outcome import RegressionOutcome
from materialize.scalability.workload_result import WorkloadResult

T = TypeVar("T")


@dataclass
class BenchmarkResult:
    overall_regression_outcome: RegressionOutcome
    df_total_by_endpoint_name_and_workload: dict[str, dict[str, DfTotals]]
    df_details_by_endpoint_name_and_workload: dict[str, dict[str, DfDetails]]

    def __init__(self):
        self.overall_regression_outcome = RegressionOutcome()
        self.df_total_by_endpoint_name_and_workload = dict()
        self.df_details_by_endpoint_name_and_workload = dict()

    def add_regression(self, regression_outcome: RegressionOutcome | None) -> None:
        if regression_outcome is not None:
            self.overall_regression_outcome.merge(regression_outcome)

    def get_endpoint_names(self) -> list[str]:
        return list(self.df_total_by_endpoint_name_and_workload.keys())

    def append_workload_result(
        self, endpoint_version_info: str, result: WorkloadResult
    ) -> None:
        if (
            endpoint_version_info
            not in self.df_total_by_endpoint_name_and_workload.keys()
        ):
            self.df_total_by_endpoint_name_and_workload[endpoint_version_info] = dict()
            self.df_details_by_endpoint_name_and_workload[
                endpoint_version_info
            ] = dict()

        workload_name = result.workload.name()
        if (
            workload_name
            in self.df_total_by_endpoint_name_and_workload[endpoint_version_info].keys()
        ):
            # Entry already exists, this happens in case of retries
            print(
                f"Replacing result entry for endpoint ({endpoint_version_info}) and workload {workload_name}"
            )

        self.df_total_by_endpoint_name_and_workload[endpoint_version_info][
            workload_name
        ] = result.df_totals
        self.df_details_by_endpoint_name_and_workload[endpoint_version_info][
            workload_name
        ] = result.df_details

    def get_df_total_by_endpoint_name(self, endpoint_name: str) -> DfTotals:
        return concat_df_totals(
            list(self.df_total_by_endpoint_name_and_workload[endpoint_name].values())
        )

    def get_df_total_by_workload_and_endpoint(
        self,
    ) -> dict[str, dict[str, DfTotals]]:
        return self._swap_endpoint_and_workload_grouping(
            self.df_total_by_endpoint_name_and_workload
        )

    def get_df_details_by_workload_and_endpoint(
        self,
    ) -> dict[str, dict[str, DfDetails]]:
        return self._swap_endpoint_and_workload_grouping(
            self.df_details_by_endpoint_name_and_workload
        )

    def _swap_endpoint_and_workload_grouping(
        self, result_by_endpoint_and_workload: dict[str, dict[str, T]]
    ) -> dict[str, dict[str, T]]:
        result: dict[str, dict[str, T]] = dict()

        for (
            endpoint_name,
            data_by_workload,
        ) in result_by_endpoint_and_workload.items():
            for workload_name, data in data_by_workload.items():
                if workload_name not in result.keys():
                    result[workload_name] = dict()

                result[workload_name][endpoint_name] = data

        return result

Classes

class BenchmarkResult

BenchmarkResult()

Expand source code Browse git
@dataclass
class BenchmarkResult:
    overall_regression_outcome: RegressionOutcome
    df_total_by_endpoint_name_and_workload: dict[str, dict[str, DfTotals]]
    df_details_by_endpoint_name_and_workload: dict[str, dict[str, DfDetails]]

    def __init__(self):
        self.overall_regression_outcome = RegressionOutcome()
        self.df_total_by_endpoint_name_and_workload = dict()
        self.df_details_by_endpoint_name_and_workload = dict()

    def add_regression(self, regression_outcome: RegressionOutcome | None) -> None:
        if regression_outcome is not None:
            self.overall_regression_outcome.merge(regression_outcome)

    def get_endpoint_names(self) -> list[str]:
        return list(self.df_total_by_endpoint_name_and_workload.keys())

    def append_workload_result(
        self, endpoint_version_info: str, result: WorkloadResult
    ) -> None:
        if (
            endpoint_version_info
            not in self.df_total_by_endpoint_name_and_workload.keys()
        ):
            self.df_total_by_endpoint_name_and_workload[endpoint_version_info] = dict()
            self.df_details_by_endpoint_name_and_workload[
                endpoint_version_info
            ] = dict()

        workload_name = result.workload.name()
        if (
            workload_name
            in self.df_total_by_endpoint_name_and_workload[endpoint_version_info].keys()
        ):
            # Entry already exists, this happens in case of retries
            print(
                f"Replacing result entry for endpoint ({endpoint_version_info}) and workload {workload_name}"
            )

        self.df_total_by_endpoint_name_and_workload[endpoint_version_info][
            workload_name
        ] = result.df_totals
        self.df_details_by_endpoint_name_and_workload[endpoint_version_info][
            workload_name
        ] = result.df_details

    def get_df_total_by_endpoint_name(self, endpoint_name: str) -> DfTotals:
        return concat_df_totals(
            list(self.df_total_by_endpoint_name_and_workload[endpoint_name].values())
        )

    def get_df_total_by_workload_and_endpoint(
        self,
    ) -> dict[str, dict[str, DfTotals]]:
        return self._swap_endpoint_and_workload_grouping(
            self.df_total_by_endpoint_name_and_workload
        )

    def get_df_details_by_workload_and_endpoint(
        self,
    ) -> dict[str, dict[str, DfDetails]]:
        return self._swap_endpoint_and_workload_grouping(
            self.df_details_by_endpoint_name_and_workload
        )

    def _swap_endpoint_and_workload_grouping(
        self, result_by_endpoint_and_workload: dict[str, dict[str, T]]
    ) -> dict[str, dict[str, T]]:
        result: dict[str, dict[str, T]] = dict()

        for (
            endpoint_name,
            data_by_workload,
        ) in result_by_endpoint_and_workload.items():
            for workload_name, data in data_by_workload.items():
                if workload_name not in result.keys():
                    result[workload_name] = dict()

                result[workload_name][endpoint_name] = data

        return result

Class variables

var df_details_by_endpoint_name_and_workload : dict[str, dict[str, DfDetails]]
var df_total_by_endpoint_name_and_workload : dict[str, dict[str, DfTotals]]
var overall_regression_outcomeRegressionOutcome

Methods

def add_regression(self, regression_outcome: RegressionOutcome | None) ‑> None
Expand source code Browse git
def add_regression(self, regression_outcome: RegressionOutcome | None) -> None:
    if regression_outcome is not None:
        self.overall_regression_outcome.merge(regression_outcome)
def append_workload_result(self, endpoint_version_info: str, result: WorkloadResult) ‑> None
Expand source code Browse git
def append_workload_result(
    self, endpoint_version_info: str, result: WorkloadResult
) -> None:
    if (
        endpoint_version_info
        not in self.df_total_by_endpoint_name_and_workload.keys()
    ):
        self.df_total_by_endpoint_name_and_workload[endpoint_version_info] = dict()
        self.df_details_by_endpoint_name_and_workload[
            endpoint_version_info
        ] = dict()

    workload_name = result.workload.name()
    if (
        workload_name
        in self.df_total_by_endpoint_name_and_workload[endpoint_version_info].keys()
    ):
        # Entry already exists, this happens in case of retries
        print(
            f"Replacing result entry for endpoint ({endpoint_version_info}) and workload {workload_name}"
        )

    self.df_total_by_endpoint_name_and_workload[endpoint_version_info][
        workload_name
    ] = result.df_totals
    self.df_details_by_endpoint_name_and_workload[endpoint_version_info][
        workload_name
    ] = result.df_details
def get_df_details_by_workload_and_endpoint(self) ‑> dict[str, dict[str, DfDetails]]
Expand source code Browse git
def get_df_details_by_workload_and_endpoint(
    self,
) -> dict[str, dict[str, DfDetails]]:
    return self._swap_endpoint_and_workload_grouping(
        self.df_details_by_endpoint_name_and_workload
    )
def get_df_total_by_endpoint_name(self, endpoint_name: str) ‑> DfTotals
Expand source code Browse git
def get_df_total_by_endpoint_name(self, endpoint_name: str) -> DfTotals:
    return concat_df_totals(
        list(self.df_total_by_endpoint_name_and_workload[endpoint_name].values())
    )
def get_df_total_by_workload_and_endpoint(self) ‑> dict[str, dict[str, DfTotals]]
Expand source code Browse git
def get_df_total_by_workload_and_endpoint(
    self,
) -> dict[str, dict[str, DfTotals]]:
    return self._swap_endpoint_and_workload_grouping(
        self.df_total_by_endpoint_name_and_workload
    )
def get_endpoint_names(self) ‑> list[str]
Expand source code Browse git
def get_endpoint_names(self) -> list[str]:
    return list(self.df_total_by_endpoint_name_and_workload.keys())