Module materialize.data_ingest.workload

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
import time
from collections.abc import Iterator
from typing import Any

import pg8000

from materialize.data_ingest.data_type import DATA_TYPES_FOR_AVRO, DATA_TYPES_FOR_KEY
from materialize.data_ingest.definition import (
    Delete,
    Insert,
    Keyspace,
    Records,
    RecordSize,
    Upsert,
)
from materialize.data_ingest.executor import PgExecutor, PrintExecutor
from materialize.data_ingest.field import Field
from materialize.data_ingest.transaction import Transaction
from materialize.data_ingest.transaction_def import (
    RestartMz,
    TransactionDef,
    TransactionSize,
)
from materialize.mzcompose.composition import Composition
from materialize.util import all_subclasses


class Workload:
    cycle: list[TransactionDef]

    def __init__(self, composition: Composition | None) -> None:
        raise NotImplementedError

    def generate(self, fields: list[Field]) -> Iterator[Transaction]:
        while True:
            for transaction_def in self.cycle:
                for transaction in transaction_def.generate(fields):
                    if transaction:
                        yield transaction


class SingleSensorUpdating(Workload):
    def __init__(self, composition: Composition | None) -> None:
        self.cycle = [
            TransactionDef(
                [
                    Upsert(
                        keyspace=Keyspace.SINGLE_VALUE,
                        count=Records.ONE,
                        record_size=RecordSize.SMALL,
                    )
                ]
            )
        ]


class SingleSensorUpdatingDisruptions(Workload):
    def __init__(self, composition: Composition | None) -> None:
        self.cycle = [
            TransactionDef(
                [
                    Upsert(
                        keyspace=Keyspace.SINGLE_VALUE,
                        count=Records.ONE,
                        record_size=RecordSize.SMALL,
                    ),
                ]
            ),
        ]
        if composition:
            self.cycle.append(RestartMz(composition, probability=0.1))


class DeleteDataAtEndOfDay(Workload):
    def __init__(self, composition: Composition | None) -> None:
        insert = Insert(
            count=Records.SOME,
            record_size=RecordSize.SMALL,
        )
        insert_phase = TransactionDef(
            size=TransactionSize.HUGE,
            operations=[insert],
        )
        # Delete all records in a single transaction
        delete_phase = TransactionDef(
            [
                Delete(
                    number_of_records=Records.ALL,
                    record_size=RecordSize.SMALL,
                    num=insert.max_key(),
                )
            ]
        )
        self.cycle = [
            insert_phase,
            delete_phase,
        ]


class DeleteDataAtEndOfDayDisruptions(Workload):
    def __init__(self, composition: Composition | None) -> None:
        insert = Insert(
            count=Records.SOME,
            record_size=RecordSize.SMALL,
        )
        insert_phase = TransactionDef(
            size=TransactionSize.HUGE,
            operations=[insert],
        )
        # Delete all records in a single transaction
        delete_phase = TransactionDef(
            [
                Delete(
                    number_of_records=Records.ALL,
                    record_size=RecordSize.SMALL,
                    num=insert.max_key(),
                )
            ]
        )
        self.cycle = [
            insert_phase,
            delete_phase,
        ]

        if composition:
            self.cycle.append(RestartMz(composition, probability=0.1))


# TODO: Implement
# class ProgressivelyEnrichRecords(Workload):
#    def __init__(self) -> None:
#        self.cycle: list[Definition] = [
#        ]


WORKLOADS = all_subclasses(Workload)


def execute_workload(
    executor_classes: list[Any],
    workload: Workload,
    num: int,
    ports: dict[str, int],
    runtime: int,
    verbose: bool,
) -> None:
    fields = []

    for i in range(random.randint(1, 10)):
        fields.append(Field(f"key{i}", random.choice(DATA_TYPES_FOR_KEY), True))
    for i in range(random.randint(0, 20)):
        fields.append(Field(f"value{i}", random.choice(DATA_TYPES_FOR_AVRO), False))
    print(f"With fields: {fields}")

    executors = [
        executor_class(num, ports, fields, "materialize")
        for executor_class in [PgExecutor] + executor_classes
    ]
    pg_executor = executors[0]

    start = time.time()

    run_executors = ([PrintExecutor(ports)] if verbose else []) + executors
    for exe in run_executors:
        exe.create()
    for i, transaction in enumerate(workload.generate(fields)):
        duration = time.time() - start
        if duration > runtime:
            print(f"Ran {i} transactions in {duration} s")
            assert i > 0
            break
        for executor in run_executors:
            executor.run(transaction)

    order_str = ", ".join(str(i + 1) for i in range(len(fields)))

    with pg_executor.pg_conn.cursor() as cur:
        cur.execute(f"SELECT * FROM {pg_executor.table} ORDER BY {order_str}")
        expected_result = cur.fetchall()
        print(f"Expected (via Postgres): {expected_result}")

    # Reconnect as Mz disruptions may have destroyed the previous connection
    conn = pg8000.connect(
        host="localhost",
        port=ports["materialized"],
        user="materialize",
        database="materialize",
    )

    for executor in executors:
        correct_once = False
        sleep_time = 0.1
        while sleep_time < 60:
            conn.autocommit = True
            with conn.cursor() as cur:
                try:
                    cur.execute(f"SELECT * FROM {executor.table} ORDER BY {order_str}")
                except:
                    print(f"Comparing against {type(executor).__name__} failed")
                    raise
                actual_result = cur.fetchall()
            conn.autocommit = False
            if actual_result == expected_result:
                if correct_once:
                    break
                print(
                    "Results match. Check for correctness again to make sure the result is stable"
                )
                correct_once = True
                time.sleep(sleep_time)
                continue
            else:
                print(f"Unexpected ({type(executor).__name__}): {actual_result}")
            print(f"Results don't match, sleeping for {sleep_time}s")
            time.sleep(sleep_time)
            sleep_time *= 2
        else:
            raise ValueError(f"Unexpected result {actual_result} != {expected_result}")  # type: ignore

Functions

def execute_workload(executor_classes: list[typing.Any], workload: Workload, num: int, ports: dict[str, int], runtime: int, verbose: bool) ‑> None
Expand source code Browse git
def execute_workload(
    executor_classes: list[Any],
    workload: Workload,
    num: int,
    ports: dict[str, int],
    runtime: int,
    verbose: bool,
) -> None:
    fields = []

    for i in range(random.randint(1, 10)):
        fields.append(Field(f"key{i}", random.choice(DATA_TYPES_FOR_KEY), True))
    for i in range(random.randint(0, 20)):
        fields.append(Field(f"value{i}", random.choice(DATA_TYPES_FOR_AVRO), False))
    print(f"With fields: {fields}")

    executors = [
        executor_class(num, ports, fields, "materialize")
        for executor_class in [PgExecutor] + executor_classes
    ]
    pg_executor = executors[0]

    start = time.time()

    run_executors = ([PrintExecutor(ports)] if verbose else []) + executors
    for exe in run_executors:
        exe.create()
    for i, transaction in enumerate(workload.generate(fields)):
        duration = time.time() - start
        if duration > runtime:
            print(f"Ran {i} transactions in {duration} s")
            assert i > 0
            break
        for executor in run_executors:
            executor.run(transaction)

    order_str = ", ".join(str(i + 1) for i in range(len(fields)))

    with pg_executor.pg_conn.cursor() as cur:
        cur.execute(f"SELECT * FROM {pg_executor.table} ORDER BY {order_str}")
        expected_result = cur.fetchall()
        print(f"Expected (via Postgres): {expected_result}")

    # Reconnect as Mz disruptions may have destroyed the previous connection
    conn = pg8000.connect(
        host="localhost",
        port=ports["materialized"],
        user="materialize",
        database="materialize",
    )

    for executor in executors:
        correct_once = False
        sleep_time = 0.1
        while sleep_time < 60:
            conn.autocommit = True
            with conn.cursor() as cur:
                try:
                    cur.execute(f"SELECT * FROM {executor.table} ORDER BY {order_str}")
                except:
                    print(f"Comparing against {type(executor).__name__} failed")
                    raise
                actual_result = cur.fetchall()
            conn.autocommit = False
            if actual_result == expected_result:
                if correct_once:
                    break
                print(
                    "Results match. Check for correctness again to make sure the result is stable"
                )
                correct_once = True
                time.sleep(sleep_time)
                continue
            else:
                print(f"Unexpected ({type(executor).__name__}): {actual_result}")
            print(f"Results don't match, sleeping for {sleep_time}s")
            time.sleep(sleep_time)
            sleep_time *= 2
        else:
            raise ValueError(f"Unexpected result {actual_result} != {expected_result}")  # type: ignore

Classes

class DeleteDataAtEndOfDay (composition: Composition | None)
Expand source code Browse git
class DeleteDataAtEndOfDay(Workload):
    def __init__(self, composition: Composition | None) -> None:
        insert = Insert(
            count=Records.SOME,
            record_size=RecordSize.SMALL,
        )
        insert_phase = TransactionDef(
            size=TransactionSize.HUGE,
            operations=[insert],
        )
        # Delete all records in a single transaction
        delete_phase = TransactionDef(
            [
                Delete(
                    number_of_records=Records.ALL,
                    record_size=RecordSize.SMALL,
                    num=insert.max_key(),
                )
            ]
        )
        self.cycle = [
            insert_phase,
            delete_phase,
        ]

Ancestors

class DeleteDataAtEndOfDayDisruptions (composition: Composition | None)
Expand source code Browse git
class DeleteDataAtEndOfDayDisruptions(Workload):
    def __init__(self, composition: Composition | None) -> None:
        insert = Insert(
            count=Records.SOME,
            record_size=RecordSize.SMALL,
        )
        insert_phase = TransactionDef(
            size=TransactionSize.HUGE,
            operations=[insert],
        )
        # Delete all records in a single transaction
        delete_phase = TransactionDef(
            [
                Delete(
                    number_of_records=Records.ALL,
                    record_size=RecordSize.SMALL,
                    num=insert.max_key(),
                )
            ]
        )
        self.cycle = [
            insert_phase,
            delete_phase,
        ]

        if composition:
            self.cycle.append(RestartMz(composition, probability=0.1))

Ancestors

class SingleSensorUpdating (composition: Composition | None)
Expand source code Browse git
class SingleSensorUpdating(Workload):
    def __init__(self, composition: Composition | None) -> None:
        self.cycle = [
            TransactionDef(
                [
                    Upsert(
                        keyspace=Keyspace.SINGLE_VALUE,
                        count=Records.ONE,
                        record_size=RecordSize.SMALL,
                    )
                ]
            )
        ]

Ancestors

class SingleSensorUpdatingDisruptions (composition: Composition | None)
Expand source code Browse git
class SingleSensorUpdatingDisruptions(Workload):
    def __init__(self, composition: Composition | None) -> None:
        self.cycle = [
            TransactionDef(
                [
                    Upsert(
                        keyspace=Keyspace.SINGLE_VALUE,
                        count=Records.ONE,
                        record_size=RecordSize.SMALL,
                    ),
                ]
            ),
        ]
        if composition:
            self.cycle.append(RestartMz(composition, probability=0.1))

Ancestors

class Workload (composition: Composition | None)
Expand source code Browse git
class Workload:
    cycle: list[TransactionDef]

    def __init__(self, composition: Composition | None) -> None:
        raise NotImplementedError

    def generate(self, fields: list[Field]) -> Iterator[Transaction]:
        while True:
            for transaction_def in self.cycle:
                for transaction in transaction_def.generate(fields):
                    if transaction:
                        yield transaction

Subclasses

Class variables

var cycle : list[TransactionDef]

Methods

def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[Transaction]
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[Transaction]:
    while True:
        for transaction_def in self.cycle:
            for transaction in transaction_def.generate(fields):
                if transaction:
                    yield transaction