Module materialize.feature_benchmark.measurement_source
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import re
import textwrap
import time
from collections.abc import Callable
from materialize.feature_benchmark.executor import Executor
Timestamp = float
class MeasurementSource:
def __init__(self) -> None:
self._executor: Executor | None = None
def run(
self,
executor: Executor | None = None,
) -> None | Timestamp | list[Timestamp]:
assert False
class Td(MeasurementSource):
"""Use testdrive to run the queries under benchmark and extract the timing information
out of the testdrive output. The output looks like this:
> /* A */ CREATE ...
rows match; continuing at ts 1639561166.4809854
> /* B */ SELECT ...
rows didn't match; sleeping to see if dataflow catches up
rows match; continuing at ts 1639561175.6951854
So we fish for the /* A */ and /* B */ markers and the timestamps reported for each
"""
def __init__(self, td_str: str, dedent: bool = True) -> None:
self._td_str = textwrap.dedent(td_str) if dedent else td_str
self._executor: Executor | None = None
def run(
self,
executor: Executor | None = None,
) -> list[Timestamp]:
assert not (executor is not None and self._executor is not None)
executor = executor or self._executor
assert executor
# Print each query once so that it is easier to reproduce regressions
# based on just the logs from CI
if executor.add_known_fragment(self._td_str):
print(self._td_str)
td_output = executor.Td(self._td_str)
lines = td_output.splitlines()
lines = [l for l in lines if l]
timestamps = []
for marker in ["A", "B"]:
timestamp = self._get_time_for_marker(lines, marker)
if timestamp is not None:
timestamps.append(timestamp)
return timestamps
def _get_time_for_marker(self, lines: list[str], marker: str) -> None | Timestamp:
matched_line_id = None
for id, line in enumerate(lines):
if f"/* {marker} */" in line:
if "rows match" in lines[id + 1]:
matched_line_id = id + 1
elif "rows match" in lines[id + 2]:
assert "rows didn't match" in lines[id + 1]
matched_line_id = id + 2
else:
assert False
if not matched_line_id:
# Marker /* ... */ not found
return None
matched_line = lines[matched_line_id]
regex = re.search("at ts ([0-9.]+)", matched_line)
assert regex, f"'at ts' string not found on line '{matched_line}'"
return float(regex.group(1))
class Lambda(MeasurementSource):
# Execute a lambda, such as Mz restart, within a benchmark() block and record the end timestamp
def __init__(self, _lambda: Callable) -> None:
self._lambda = _lambda
def run(
self,
executor: Executor | None = None,
) -> Timestamp:
e = executor or self._executor
assert e is not None
e.Lambda(self._lambda)
return time.time()
Classes
class Lambda (_lambda: collections.abc.Callable)
-
Expand source code Browse git
class Lambda(MeasurementSource): # Execute a lambda, such as Mz restart, within a benchmark() block and record the end timestamp def __init__(self, _lambda: Callable) -> None: self._lambda = _lambda def run( self, executor: Executor | None = None, ) -> Timestamp: e = executor or self._executor assert e is not None e.Lambda(self._lambda) return time.time()
Ancestors
Methods
def run(self, executor: Executor | None = None) ‑> float
-
Expand source code Browse git
def run( self, executor: Executor | None = None, ) -> Timestamp: e = executor or self._executor assert e is not None e.Lambda(self._lambda) return time.time()
class MeasurementSource
-
Expand source code Browse git
class MeasurementSource: def __init__(self) -> None: self._executor: Executor | None = None def run( self, executor: Executor | None = None, ) -> None | Timestamp | list[Timestamp]: assert False
Subclasses
Methods
def run(self, executor: Executor | None = None) ‑> None | float | list[float]
-
Expand source code Browse git
def run( self, executor: Executor | None = None, ) -> None | Timestamp | list[Timestamp]: assert False
class Td (td_str: str, dedent: bool = True)
-
Use testdrive to run the queries under benchmark and extract the timing information out of the testdrive output. The output looks like this:
/ A / CREATE … rows match; continuing at ts 1639561166.4809854 / B / SELECT … rows didn't match; sleeping to see if dataflow catches up rows match; continuing at ts 1639561175.6951854
So we fish for the / A / and / B / markers and the timestamps reported for each
Expand source code Browse git
class Td(MeasurementSource): """Use testdrive to run the queries under benchmark and extract the timing information out of the testdrive output. The output looks like this: > /* A */ CREATE ... rows match; continuing at ts 1639561166.4809854 > /* B */ SELECT ... rows didn't match; sleeping to see if dataflow catches up rows match; continuing at ts 1639561175.6951854 So we fish for the /* A */ and /* B */ markers and the timestamps reported for each """ def __init__(self, td_str: str, dedent: bool = True) -> None: self._td_str = textwrap.dedent(td_str) if dedent else td_str self._executor: Executor | None = None def run( self, executor: Executor | None = None, ) -> list[Timestamp]: assert not (executor is not None and self._executor is not None) executor = executor or self._executor assert executor # Print each query once so that it is easier to reproduce regressions # based on just the logs from CI if executor.add_known_fragment(self._td_str): print(self._td_str) td_output = executor.Td(self._td_str) lines = td_output.splitlines() lines = [l for l in lines if l] timestamps = [] for marker in ["A", "B"]: timestamp = self._get_time_for_marker(lines, marker) if timestamp is not None: timestamps.append(timestamp) return timestamps def _get_time_for_marker(self, lines: list[str], marker: str) -> None | Timestamp: matched_line_id = None for id, line in enumerate(lines): if f"/* {marker} */" in line: if "rows match" in lines[id + 1]: matched_line_id = id + 1 elif "rows match" in lines[id + 2]: assert "rows didn't match" in lines[id + 1] matched_line_id = id + 2 else: assert False if not matched_line_id: # Marker /* ... */ not found return None matched_line = lines[matched_line_id] regex = re.search("at ts ([0-9.]+)", matched_line) assert regex, f"'at ts' string not found on line '{matched_line}'" return float(regex.group(1))
Ancestors
Methods
def run(self, executor: Executor | None = None) ‑> list[float]
-
Expand source code Browse git
def run( self, executor: Executor | None = None, ) -> list[Timestamp]: assert not (executor is not None and self._executor is not None) executor = executor or self._executor assert executor # Print each query once so that it is easier to reproduce regressions # based on just the logs from CI if executor.add_known_fragment(self._td_str): print(self._td_str) td_output = executor.Td(self._td_str) lines = td_output.splitlines() lines = [l for l in lines if l] timestamps = [] for marker in ["A", "B"]: timestamp = self._get_time_for_marker(lines, marker) if timestamp is not None: timestamps.append(timestamp) return timestamps