Module materialize.feature_benchmark.benchmark
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import sys
from collections.abc import Iterable
from materialize import ui
from materialize.feature_benchmark.aggregation import Aggregation
from materialize.feature_benchmark.comparator import Comparator
from materialize.feature_benchmark.executor import Executor
from materialize.feature_benchmark.filter import Filter
from materialize.feature_benchmark.measurement import Measurement, MeasurementType
from materialize.feature_benchmark.scenario import Scenario
from materialize.feature_benchmark.termination import TerminationCondition
from materialize.mz_version import MzVersion
class Benchmark:
def __init__(
self,
mz_id: int,
mz_version: MzVersion,
scenario: type[Scenario],
executor: Executor,
filter: Filter,
termination_conditions: list[TerminationCondition],
aggregation_class: type[Aggregation],
default_size: int,
seed: int,
scale: str | None = None,
measure_memory: bool = True,
) -> None:
self._scale = scale
self._mz_id = mz_id
self._mz_version = mz_version
self._scenario = scenario
self._executor = executor
self._filter = filter
self._termination_conditions = termination_conditions
self._performance_aggregation = aggregation_class()
self._messages_aggregation = aggregation_class()
self._default_size = default_size
self._seed = seed
if measure_memory:
self._memory_aggregation = aggregation_class()
def run(self) -> list[Aggregation]:
scale = self._scenario.SCALE
if self._scale and not self._scenario.FIXED_SCALE:
if self._scale.startswith("+"):
scale = scale + float(self._scale.lstrip("+"))
elif self._scale.startswith("-"):
scale = scale - float(self._scale.lstrip("-"))
elif float(self._scale) > 0:
scale = float(self._scale)
scenario_class = self._scenario
scenario = scenario_class(
scale=scale,
mz_version=self._mz_version,
default_size=self._default_size,
seed=self._seed,
)
name = scenario.name()
ui.header(
f"Running scenario {name}, scale = {scenario.scale()}, N = {scenario.n()}"
)
# Run the shared() section once for both Mzs under measurement
shared = scenario.shared()
if self._mz_id == 0 and shared is not None:
print(
f"Running the shared() section for scenario {name} with {self._mz_version} ..."
)
for shared_item in shared if isinstance(shared, list) else [shared]:
shared_item.run(executor=self._executor)
print("shared() done")
# Run the init() section once for each Mz
init = scenario.init()
if init is not None:
print(
f"Running the init() section for scenario {name} with {self._mz_version} ..."
)
for init_item in init if isinstance(init, list) else [init]:
init_item.run(executor=self._executor)
print("init() done")
for i in range(sys.maxsize):
# Run the before() section once for each measurement
print(
f"Running the before() section for scenario {name} with {self._mz_version} ..."
)
before = scenario.before()
if before is not None:
for before_item in before if isinstance(before, list) else [before]:
before_item.run(executor=self._executor)
print(
f"Running the benchmark for scenario {name} with {self._mz_version} ..."
)
# Collect timestamps from any part of the workload being benchmarked
timestamps = []
benchmark = scenario.benchmark()
for benchmark_item in (
benchmark if isinstance(benchmark, list) else [benchmark]
):
item_timestamps = benchmark_item.run(executor=self._executor)
if item_timestamps:
timestamps.extend(
item_timestamps
if isinstance(item_timestamps, list)
else [item_timestamps]
)
assert (
len(timestamps) == 2
), f"benchmark() did not return exactly 2 timestamps: scenario: {scenario}, timestamps: {timestamps}"
assert (
timestamps[1] >= timestamps[0]
), f"Second timestamp reported not greater than first: scenario: {scenario}, timestamps: {timestamps}"
performance_measurement = Measurement(
type=MeasurementType.WALLCLOCK,
value=timestamps[1] - timestamps[0],
)
if not self._filter or not self._filter.filter(performance_measurement):
print(f"{i} {performance_measurement}")
self._performance_aggregation.append(performance_measurement)
messages = self._executor.Messages()
if messages is not None:
messages_measurement = Measurement(
type=MeasurementType.MESSAGES, value=messages
)
print(f"{i}: {messages_measurement}")
self._messages_aggregation.append(messages_measurement)
if self._memory_aggregation:
memory_measurement = Measurement(
type=MeasurementType.MEMORY,
value=self._executor.DockerMem() / 2**20, # Convert to Mb
)
if memory_measurement.value > 0:
if not self._filter or not self._filter.filter(memory_measurement):
print(f"{i} {memory_measurement}")
self._memory_aggregation.append(memory_measurement)
for termination_condition in self._termination_conditions:
if termination_condition.terminate(performance_measurement):
return [
self._performance_aggregation,
self._messages_aggregation,
self._memory_aggregation,
]
assert False, "unreachable"
class Report:
def __init__(self) -> None:
self._comparisons: list[Comparator] = []
def append(self, comparison: Comparator) -> None:
self._comparisons.append(comparison)
def extend(self, comparisons: Iterable[Comparator]) -> None:
self._comparisons.extend(comparisons)
def __str__(self) -> str:
output_lines = []
output_lines.append(
f"{'NAME':<35} | {'TYPE':<9} | {'THIS':^15} | {'OTHER':^15} | {'Regression?':^13} | 'THIS' is:"
)
output_lines.append("-" * 100)
for comparison in self._comparisons:
regression = "!!YES!!" if comparison.is_regression() else "no"
output_lines.append(
f"{comparison.name:<35} | {comparison.type:<9} | {comparison.this_as_str():>15} | {comparison.other_as_str():>15} | {regression:^13} | {comparison.human_readable()}"
)
return "\n".join(output_lines)
class SingleReport(Report):
def __str__(self) -> str:
output_lines = []
output_lines.append(f"{'NAME':<25} | {'THIS':^11}")
output_lines.append("-" * 50)
for comparison in self._comparisons:
output_lines.append(f"{comparison.name:<25} | {comparison.this():>11.3f}")
return "\n".join(output_lines)
Classes
class Benchmark (mz_id: int, mz_version: MzVersion, scenario: type[Scenario], executor: Executor, filter: Filter, termination_conditions: list[TerminationCondition], aggregation_class: type[Aggregation], default_size: int, seed: int, scale: str | None = None, measure_memory: bool = True)
-
Expand source code Browse git
class Benchmark: def __init__( self, mz_id: int, mz_version: MzVersion, scenario: type[Scenario], executor: Executor, filter: Filter, termination_conditions: list[TerminationCondition], aggregation_class: type[Aggregation], default_size: int, seed: int, scale: str | None = None, measure_memory: bool = True, ) -> None: self._scale = scale self._mz_id = mz_id self._mz_version = mz_version self._scenario = scenario self._executor = executor self._filter = filter self._termination_conditions = termination_conditions self._performance_aggregation = aggregation_class() self._messages_aggregation = aggregation_class() self._default_size = default_size self._seed = seed if measure_memory: self._memory_aggregation = aggregation_class() def run(self) -> list[Aggregation]: scale = self._scenario.SCALE if self._scale and not self._scenario.FIXED_SCALE: if self._scale.startswith("+"): scale = scale + float(self._scale.lstrip("+")) elif self._scale.startswith("-"): scale = scale - float(self._scale.lstrip("-")) elif float(self._scale) > 0: scale = float(self._scale) scenario_class = self._scenario scenario = scenario_class( scale=scale, mz_version=self._mz_version, default_size=self._default_size, seed=self._seed, ) name = scenario.name() ui.header( f"Running scenario {name}, scale = {scenario.scale()}, N = {scenario.n()}" ) # Run the shared() section once for both Mzs under measurement shared = scenario.shared() if self._mz_id == 0 and shared is not None: print( f"Running the shared() section for scenario {name} with {self._mz_version} ..." ) for shared_item in shared if isinstance(shared, list) else [shared]: shared_item.run(executor=self._executor) print("shared() done") # Run the init() section once for each Mz init = scenario.init() if init is not None: print( f"Running the init() section for scenario {name} with {self._mz_version} ..." ) for init_item in init if isinstance(init, list) else [init]: init_item.run(executor=self._executor) print("init() done") for i in range(sys.maxsize): # Run the before() section once for each measurement print( f"Running the before() section for scenario {name} with {self._mz_version} ..." ) before = scenario.before() if before is not None: for before_item in before if isinstance(before, list) else [before]: before_item.run(executor=self._executor) print( f"Running the benchmark for scenario {name} with {self._mz_version} ..." ) # Collect timestamps from any part of the workload being benchmarked timestamps = [] benchmark = scenario.benchmark() for benchmark_item in ( benchmark if isinstance(benchmark, list) else [benchmark] ): item_timestamps = benchmark_item.run(executor=self._executor) if item_timestamps: timestamps.extend( item_timestamps if isinstance(item_timestamps, list) else [item_timestamps] ) assert ( len(timestamps) == 2 ), f"benchmark() did not return exactly 2 timestamps: scenario: {scenario}, timestamps: {timestamps}" assert ( timestamps[1] >= timestamps[0] ), f"Second timestamp reported not greater than first: scenario: {scenario}, timestamps: {timestamps}" performance_measurement = Measurement( type=MeasurementType.WALLCLOCK, value=timestamps[1] - timestamps[0], ) if not self._filter or not self._filter.filter(performance_measurement): print(f"{i} {performance_measurement}") self._performance_aggregation.append(performance_measurement) messages = self._executor.Messages() if messages is not None: messages_measurement = Measurement( type=MeasurementType.MESSAGES, value=messages ) print(f"{i}: {messages_measurement}") self._messages_aggregation.append(messages_measurement) if self._memory_aggregation: memory_measurement = Measurement( type=MeasurementType.MEMORY, value=self._executor.DockerMem() / 2**20, # Convert to Mb ) if memory_measurement.value > 0: if not self._filter or not self._filter.filter(memory_measurement): print(f"{i} {memory_measurement}") self._memory_aggregation.append(memory_measurement) for termination_condition in self._termination_conditions: if termination_condition.terminate(performance_measurement): return [ self._performance_aggregation, self._messages_aggregation, self._memory_aggregation, ] assert False, "unreachable"
Methods
def run(self) ‑> list[Aggregation]
-
Expand source code Browse git
def run(self) -> list[Aggregation]: scale = self._scenario.SCALE if self._scale and not self._scenario.FIXED_SCALE: if self._scale.startswith("+"): scale = scale + float(self._scale.lstrip("+")) elif self._scale.startswith("-"): scale = scale - float(self._scale.lstrip("-")) elif float(self._scale) > 0: scale = float(self._scale) scenario_class = self._scenario scenario = scenario_class( scale=scale, mz_version=self._mz_version, default_size=self._default_size, seed=self._seed, ) name = scenario.name() ui.header( f"Running scenario {name}, scale = {scenario.scale()}, N = {scenario.n()}" ) # Run the shared() section once for both Mzs under measurement shared = scenario.shared() if self._mz_id == 0 and shared is not None: print( f"Running the shared() section for scenario {name} with {self._mz_version} ..." ) for shared_item in shared if isinstance(shared, list) else [shared]: shared_item.run(executor=self._executor) print("shared() done") # Run the init() section once for each Mz init = scenario.init() if init is not None: print( f"Running the init() section for scenario {name} with {self._mz_version} ..." ) for init_item in init if isinstance(init, list) else [init]: init_item.run(executor=self._executor) print("init() done") for i in range(sys.maxsize): # Run the before() section once for each measurement print( f"Running the before() section for scenario {name} with {self._mz_version} ..." ) before = scenario.before() if before is not None: for before_item in before if isinstance(before, list) else [before]: before_item.run(executor=self._executor) print( f"Running the benchmark for scenario {name} with {self._mz_version} ..." ) # Collect timestamps from any part of the workload being benchmarked timestamps = [] benchmark = scenario.benchmark() for benchmark_item in ( benchmark if isinstance(benchmark, list) else [benchmark] ): item_timestamps = benchmark_item.run(executor=self._executor) if item_timestamps: timestamps.extend( item_timestamps if isinstance(item_timestamps, list) else [item_timestamps] ) assert ( len(timestamps) == 2 ), f"benchmark() did not return exactly 2 timestamps: scenario: {scenario}, timestamps: {timestamps}" assert ( timestamps[1] >= timestamps[0] ), f"Second timestamp reported not greater than first: scenario: {scenario}, timestamps: {timestamps}" performance_measurement = Measurement( type=MeasurementType.WALLCLOCK, value=timestamps[1] - timestamps[0], ) if not self._filter or not self._filter.filter(performance_measurement): print(f"{i} {performance_measurement}") self._performance_aggregation.append(performance_measurement) messages = self._executor.Messages() if messages is not None: messages_measurement = Measurement( type=MeasurementType.MESSAGES, value=messages ) print(f"{i}: {messages_measurement}") self._messages_aggregation.append(messages_measurement) if self._memory_aggregation: memory_measurement = Measurement( type=MeasurementType.MEMORY, value=self._executor.DockerMem() / 2**20, # Convert to Mb ) if memory_measurement.value > 0: if not self._filter or not self._filter.filter(memory_measurement): print(f"{i} {memory_measurement}") self._memory_aggregation.append(memory_measurement) for termination_condition in self._termination_conditions: if termination_condition.terminate(performance_measurement): return [ self._performance_aggregation, self._messages_aggregation, self._memory_aggregation, ] assert False, "unreachable"
class Report
-
Expand source code Browse git
class Report: def __init__(self) -> None: self._comparisons: list[Comparator] = [] def append(self, comparison: Comparator) -> None: self._comparisons.append(comparison) def extend(self, comparisons: Iterable[Comparator]) -> None: self._comparisons.extend(comparisons) def __str__(self) -> str: output_lines = [] output_lines.append( f"{'NAME':<35} | {'TYPE':<9} | {'THIS':^15} | {'OTHER':^15} | {'Regression?':^13} | 'THIS' is:" ) output_lines.append("-" * 100) for comparison in self._comparisons: regression = "!!YES!!" if comparison.is_regression() else "no" output_lines.append( f"{comparison.name:<35} | {comparison.type:<9} | {comparison.this_as_str():>15} | {comparison.other_as_str():>15} | {regression:^13} | {comparison.human_readable()}" ) return "\n".join(output_lines)
Subclasses
Methods
def append(self, comparison: Comparator) ‑> None
-
Expand source code Browse git
def append(self, comparison: Comparator) -> None: self._comparisons.append(comparison)
def extend(self, comparisons: collections.abc.Iterable[Comparator]) ‑> None
-
Expand source code Browse git
def extend(self, comparisons: Iterable[Comparator]) -> None: self._comparisons.extend(comparisons)
class SingleReport
-
Expand source code Browse git
class SingleReport(Report): def __str__(self) -> str: output_lines = [] output_lines.append(f"{'NAME':<25} | {'THIS':^11}") output_lines.append("-" * 50) for comparison in self._comparisons: output_lines.append(f"{comparison.name:<25} | {comparison.this():>11.3f}") return "\n".join(output_lines)
Ancestors