Module materialize.feature_benchmark.executor

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import time
from collections.abc import Callable
from textwrap import dedent
from typing import Any

from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.mysql import MySql


class Executor:
    _known_fragments: set[str] = set()

    def Lambda(self, _lambda: Callable[["Executor"], float]) -> float:
        return _lambda(self)

    def Td(self, input: str) -> Any:
        raise NotImplementedError

    def Kgen(self, topic: str, args: list[str]) -> Any:
        raise NotImplementedError

    def add_known_fragment(self, fragment: str) -> bool:
        """
        Record whether a TD fragment has been printed already. Returns true
        if it wasn't added before.
        """
        result = fragment not in self._known_fragments
        self._known_fragments.add(fragment)
        return result

    def DockerMem(self) -> int:
        raise NotImplementedError

    def Messages(self) -> int | None:
        raise NotImplementedError


class Docker(Executor):
    def __init__(
        self, composition: Composition, seed: int, materialized: Materialized
    ) -> None:
        self._composition = composition
        self._seed = seed
        self._materialized = materialized

    def RestartMz(self) -> None:
        self._composition.kill("materialized")
        # Make sure we are restarting Materialized() with the
        # same parameters (docker tag, SIZE) it was initially started with
        with self._composition.override(self._materialized):
            self._composition.up("materialized")
        return None

    def Td(self, input: str) -> Any:
        return self._composition.exec(
            "testdrive",
            "--no-reset",
            f"--seed={self._seed}",
            "--initial-backoff=10ms",  # Retry every 10ms until success
            "--backoff-factor=0",
            "--consistency-checks=disable",
            f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
            stdin=input,
            capture=True,
        ).stdout

    def Kgen(self, topic: str, args: list[str]) -> Any:
        return self._composition.run(
            "kgen", f"--topic=testdrive-{topic}-{self._seed}", *args
        )

    def DockerMem(self) -> int:
        return self._composition.mem("materialized")

    def Messages(self) -> int | None:
        """Return the sum of all messages in the system from mz_internal.mz_message_counts_per_worker"""

        def one_count(e: Docker) -> int | None:
            result = e._composition.sql_query(
                dedent(
                    """
                    SELECT SUM(sent) as cnt
                    FROM
                        mz_internal.mz_message_counts_per_worker mc,
                        mz_internal.mz_dataflow_channel_operators_per_worker c
                    WHERE
                        c.id = mc.channel_id AND
                        c.worker_id = mc.from_worker_id AND
                        from_operator_id IN (
                            SELECT dod.id
                            FROM mz_internal.mz_dataflow_operator_dataflows dod
                            WHERE dod.dataflow_name NOT LIKE '%oneshot-select%'
                            AND dod.dataflow_name NOT LIKE '%subscribe%'
                        )
                    """
                )
            )
            if len(result) == 0:
                return None
            elif result[0][0] is None:
                return None
            else:
                return int(result[0][0])

        # Loop until the message count converges
        prev_count: int | None = None
        for i in range(50):
            new_count = one_count(self)
            if new_count is not None and prev_count is not None:
                pct = (max(prev_count, new_count) / min(prev_count, new_count)) - 1
                # It has converged
                if pct < 0.05 and i > 2:
                    return new_count

            # No message count data available
            if new_count is None and i > 2:
                return new_count

            prev_count = new_count
            time.sleep(0.1)

        return None


class MzCloud(Executor):
    def __init__(
        self,
        composition: Composition,
        seed: int,
        mzcloud_url: str,
        external_addr: str,
    ) -> None:
        self._composition = composition
        self._seed = seed
        self._mzcloud_url = mzcloud_url
        self._external_addr = external_addr
        self._testdrive_args = [
            f"--materialize-url={self._mzcloud_url}",
            f"--kafka-addr={self._external_addr}:9092",
            f"--schema-registry-url=http://{self._external_addr}:8081",
            f"--seed={self._seed}",
            f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
        ]

    def RestartMz(self) -> None:
        # We can't restart the cloud, so complain.
        assert False

    def Reset(self) -> None:
        print("resetting")
        self._composition.exec(
            "testdrive",
            *self._testdrive_args,
            # Use a lower timeout so we complain if the mzcloud_url was wrong or inaccessible.
            "--default-timeout=10s",
            stdin="",
        )
        print("reset done")

    def Td(self, input: str) -> Any:
        return self._composition.exec(
            "testdrive",
            "--no-reset",
            *self._testdrive_args,
            "--initial-backoff=10ms",
            "--backoff-factor=0",
            "--consistency-checks=disable",
            stdin=input,
            capture=True,
        ).stdout

    def Kgen(self, topic: str, args: list[str]) -> Any:
        # TODO: Implement
        assert False

Classes

class Docker (composition: Composition, seed: int, materialized: Materialized)
Expand source code Browse git
class Docker(Executor):
    def __init__(
        self, composition: Composition, seed: int, materialized: Materialized
    ) -> None:
        self._composition = composition
        self._seed = seed
        self._materialized = materialized

    def RestartMz(self) -> None:
        self._composition.kill("materialized")
        # Make sure we are restarting Materialized() with the
        # same parameters (docker tag, SIZE) it was initially started with
        with self._composition.override(self._materialized):
            self._composition.up("materialized")
        return None

    def Td(self, input: str) -> Any:
        return self._composition.exec(
            "testdrive",
            "--no-reset",
            f"--seed={self._seed}",
            "--initial-backoff=10ms",  # Retry every 10ms until success
            "--backoff-factor=0",
            "--consistency-checks=disable",
            f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
            stdin=input,
            capture=True,
        ).stdout

    def Kgen(self, topic: str, args: list[str]) -> Any:
        return self._composition.run(
            "kgen", f"--topic=testdrive-{topic}-{self._seed}", *args
        )

    def DockerMem(self) -> int:
        return self._composition.mem("materialized")

    def Messages(self) -> int | None:
        """Return the sum of all messages in the system from mz_internal.mz_message_counts_per_worker"""

        def one_count(e: Docker) -> int | None:
            result = e._composition.sql_query(
                dedent(
                    """
                    SELECT SUM(sent) as cnt
                    FROM
                        mz_internal.mz_message_counts_per_worker mc,
                        mz_internal.mz_dataflow_channel_operators_per_worker c
                    WHERE
                        c.id = mc.channel_id AND
                        c.worker_id = mc.from_worker_id AND
                        from_operator_id IN (
                            SELECT dod.id
                            FROM mz_internal.mz_dataflow_operator_dataflows dod
                            WHERE dod.dataflow_name NOT LIKE '%oneshot-select%'
                            AND dod.dataflow_name NOT LIKE '%subscribe%'
                        )
                    """
                )
            )
            if len(result) == 0:
                return None
            elif result[0][0] is None:
                return None
            else:
                return int(result[0][0])

        # Loop until the message count converges
        prev_count: int | None = None
        for i in range(50):
            new_count = one_count(self)
            if new_count is not None and prev_count is not None:
                pct = (max(prev_count, new_count) / min(prev_count, new_count)) - 1
                # It has converged
                if pct < 0.05 and i > 2:
                    return new_count

            # No message count data available
            if new_count is None and i > 2:
                return new_count

            prev_count = new_count
            time.sleep(0.1)

        return None

Ancestors

Methods

def DockerMem(self) ‑> int
Expand source code Browse git
def DockerMem(self) -> int:
    return self._composition.mem("materialized")
def Kgen(self, topic: str, args: list[str]) ‑> Any
Expand source code Browse git
def Kgen(self, topic: str, args: list[str]) -> Any:
    return self._composition.run(
        "kgen", f"--topic=testdrive-{topic}-{self._seed}", *args
    )
def Messages(self) ‑> int | None

Return the sum of all messages in the system from mz_internal.mz_message_counts_per_worker

Expand source code Browse git
def Messages(self) -> int | None:
    """Return the sum of all messages in the system from mz_internal.mz_message_counts_per_worker"""

    def one_count(e: Docker) -> int | None:
        result = e._composition.sql_query(
            dedent(
                """
                SELECT SUM(sent) as cnt
                FROM
                    mz_internal.mz_message_counts_per_worker mc,
                    mz_internal.mz_dataflow_channel_operators_per_worker c
                WHERE
                    c.id = mc.channel_id AND
                    c.worker_id = mc.from_worker_id AND
                    from_operator_id IN (
                        SELECT dod.id
                        FROM mz_internal.mz_dataflow_operator_dataflows dod
                        WHERE dod.dataflow_name NOT LIKE '%oneshot-select%'
                        AND dod.dataflow_name NOT LIKE '%subscribe%'
                    )
                """
            )
        )
        if len(result) == 0:
            return None
        elif result[0][0] is None:
            return None
        else:
            return int(result[0][0])

    # Loop until the message count converges
    prev_count: int | None = None
    for i in range(50):
        new_count = one_count(self)
        if new_count is not None and prev_count is not None:
            pct = (max(prev_count, new_count) / min(prev_count, new_count)) - 1
            # It has converged
            if pct < 0.05 and i > 2:
                return new_count

        # No message count data available
        if new_count is None and i > 2:
            return new_count

        prev_count = new_count
        time.sleep(0.1)

    return None
def RestartMz(self) ‑> None
Expand source code Browse git
def RestartMz(self) -> None:
    self._composition.kill("materialized")
    # Make sure we are restarting Materialized() with the
    # same parameters (docker tag, SIZE) it was initially started with
    with self._composition.override(self._materialized):
        self._composition.up("materialized")
    return None
def Td(self, input: str) ‑> Any
Expand source code Browse git
def Td(self, input: str) -> Any:
    return self._composition.exec(
        "testdrive",
        "--no-reset",
        f"--seed={self._seed}",
        "--initial-backoff=10ms",  # Retry every 10ms until success
        "--backoff-factor=0",
        "--consistency-checks=disable",
        f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
        stdin=input,
        capture=True,
    ).stdout

Inherited members

class Executor
Expand source code Browse git
class Executor:
    _known_fragments: set[str] = set()

    def Lambda(self, _lambda: Callable[["Executor"], float]) -> float:
        return _lambda(self)

    def Td(self, input: str) -> Any:
        raise NotImplementedError

    def Kgen(self, topic: str, args: list[str]) -> Any:
        raise NotImplementedError

    def add_known_fragment(self, fragment: str) -> bool:
        """
        Record whether a TD fragment has been printed already. Returns true
        if it wasn't added before.
        """
        result = fragment not in self._known_fragments
        self._known_fragments.add(fragment)
        return result

    def DockerMem(self) -> int:
        raise NotImplementedError

    def Messages(self) -> int | None:
        raise NotImplementedError

Subclasses

Methods

def DockerMem(self) ‑> int
Expand source code Browse git
def DockerMem(self) -> int:
    raise NotImplementedError
def Kgen(self, topic: str, args: list[str]) ‑> Any
Expand source code Browse git
def Kgen(self, topic: str, args: list[str]) -> Any:
    raise NotImplementedError
def Lambda(self, _lambda: collections.abc.Callable[['Executor'], float]) ‑> float
Expand source code Browse git
def Lambda(self, _lambda: Callable[["Executor"], float]) -> float:
    return _lambda(self)
def Messages(self) ‑> int | None
Expand source code Browse git
def Messages(self) -> int | None:
    raise NotImplementedError
def Td(self, input: str) ‑> Any
Expand source code Browse git
def Td(self, input: str) -> Any:
    raise NotImplementedError
def add_known_fragment(self, fragment: str) ‑> bool

Record whether a TD fragment has been printed already. Returns true if it wasn't added before.

Expand source code Browse git
def add_known_fragment(self, fragment: str) -> bool:
    """
    Record whether a TD fragment has been printed already. Returns true
    if it wasn't added before.
    """
    result = fragment not in self._known_fragments
    self._known_fragments.add(fragment)
    return result
class MzCloud (composition: Composition, seed: int, mzcloud_url: str, external_addr: str)
Expand source code Browse git
class MzCloud(Executor):
    def __init__(
        self,
        composition: Composition,
        seed: int,
        mzcloud_url: str,
        external_addr: str,
    ) -> None:
        self._composition = composition
        self._seed = seed
        self._mzcloud_url = mzcloud_url
        self._external_addr = external_addr
        self._testdrive_args = [
            f"--materialize-url={self._mzcloud_url}",
            f"--kafka-addr={self._external_addr}:9092",
            f"--schema-registry-url=http://{self._external_addr}:8081",
            f"--seed={self._seed}",
            f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
        ]

    def RestartMz(self) -> None:
        # We can't restart the cloud, so complain.
        assert False

    def Reset(self) -> None:
        print("resetting")
        self._composition.exec(
            "testdrive",
            *self._testdrive_args,
            # Use a lower timeout so we complain if the mzcloud_url was wrong or inaccessible.
            "--default-timeout=10s",
            stdin="",
        )
        print("reset done")

    def Td(self, input: str) -> Any:
        return self._composition.exec(
            "testdrive",
            "--no-reset",
            *self._testdrive_args,
            "--initial-backoff=10ms",
            "--backoff-factor=0",
            "--consistency-checks=disable",
            stdin=input,
            capture=True,
        ).stdout

    def Kgen(self, topic: str, args: list[str]) -> Any:
        # TODO: Implement
        assert False

Ancestors

Methods

def Kgen(self, topic: str, args: list[str]) ‑> Any
Expand source code Browse git
def Kgen(self, topic: str, args: list[str]) -> Any:
    # TODO: Implement
    assert False
def Reset(self) ‑> None
Expand source code Browse git
def Reset(self) -> None:
    print("resetting")
    self._composition.exec(
        "testdrive",
        *self._testdrive_args,
        # Use a lower timeout so we complain if the mzcloud_url was wrong or inaccessible.
        "--default-timeout=10s",
        stdin="",
    )
    print("reset done")
def RestartMz(self) ‑> None
Expand source code Browse git
def RestartMz(self) -> None:
    # We can't restart the cloud, so complain.
    assert False
def Td(self, input: str) ‑> Any
Expand source code Browse git
def Td(self, input: str) -> Any:
    return self._composition.exec(
        "testdrive",
        "--no-reset",
        *self._testdrive_args,
        "--initial-backoff=10ms",
        "--backoff-factor=0",
        "--consistency-checks=disable",
        stdin=input,
        capture=True,
    ).stdout

Inherited members