Module materialize.data_ingest.workload
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
import time
from collections.abc import Iterator
from typing import Any
import pg8000
from materialize.data_ingest.data_type import DATA_TYPES_FOR_AVRO, DATA_TYPES_FOR_KEY
from materialize.data_ingest.definition import (
Delete,
Insert,
Keyspace,
Records,
RecordSize,
Upsert,
)
from materialize.data_ingest.executor import PgExecutor, PrintExecutor
from materialize.data_ingest.field import Field
from materialize.data_ingest.transaction import Transaction
from materialize.data_ingest.transaction_def import (
RestartMz,
TransactionDef,
TransactionSize,
)
from materialize.mzcompose.composition import Composition
from materialize.util import all_subclasses
class Workload:
cycle: list[TransactionDef]
def __init__(self, composition: Composition | None) -> None:
raise NotImplementedError
def generate(self, fields: list[Field]) -> Iterator[Transaction]:
while True:
for transaction_def in self.cycle:
for transaction in transaction_def.generate(fields):
if transaction:
yield transaction
class SingleSensorUpdating(Workload):
def __init__(self, composition: Composition | None) -> None:
self.cycle = [
TransactionDef(
[
Upsert(
keyspace=Keyspace.SINGLE_VALUE,
count=Records.ONE,
record_size=RecordSize.SMALL,
)
]
)
]
class SingleSensorUpdatingDisruptions(Workload):
def __init__(self, composition: Composition | None) -> None:
self.cycle = [
TransactionDef(
[
Upsert(
keyspace=Keyspace.SINGLE_VALUE,
count=Records.ONE,
record_size=RecordSize.SMALL,
),
]
),
]
if composition:
self.cycle.append(RestartMz(composition, probability=0.1))
class DeleteDataAtEndOfDay(Workload):
def __init__(self, composition: Composition | None) -> None:
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
)
insert_phase = TransactionDef(
size=TransactionSize.HUGE,
operations=[insert],
)
# Delete all records in a single transaction
delete_phase = TransactionDef(
[
Delete(
number_of_records=Records.ALL,
record_size=RecordSize.SMALL,
num=insert.max_key(),
)
]
)
self.cycle = [
insert_phase,
delete_phase,
]
class DeleteDataAtEndOfDayDisruptions(Workload):
def __init__(self, composition: Composition | None) -> None:
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
)
insert_phase = TransactionDef(
size=TransactionSize.HUGE,
operations=[insert],
)
# Delete all records in a single transaction
delete_phase = TransactionDef(
[
Delete(
number_of_records=Records.ALL,
record_size=RecordSize.SMALL,
num=insert.max_key(),
)
]
)
self.cycle = [
insert_phase,
delete_phase,
]
if composition:
self.cycle.append(RestartMz(composition, probability=0.1))
# TODO: Implement
# class ProgressivelyEnrichRecords(Workload):
# def __init__(self) -> None:
# self.cycle: list[Definition] = [
# ]
WORKLOADS = all_subclasses(Workload)
def execute_workload(
executor_classes: list[Any],
workload: Workload,
num: int,
ports: dict[str, int],
runtime: int,
verbose: bool,
) -> None:
fields = []
for i in range(random.randint(1, 10)):
fields.append(Field(f"key{i}", random.choice(DATA_TYPES_FOR_KEY), True))
for i in range(random.randint(0, 20)):
fields.append(Field(f"value{i}", random.choice(DATA_TYPES_FOR_AVRO), False))
print(f"With fields: {fields}")
executors = [
executor_class(num, ports, fields, "materialize")
for executor_class in [PgExecutor] + executor_classes
]
pg_executor = executors[0]
start = time.time()
run_executors = ([PrintExecutor(ports)] if verbose else []) + executors
for exe in run_executors:
exe.create()
for i, transaction in enumerate(workload.generate(fields)):
duration = time.time() - start
if duration > runtime:
print(f"Ran {i} transactions in {duration} s")
assert i > 0
break
for executor in run_executors:
executor.run(transaction)
order_str = ", ".join(str(i + 1) for i in range(len(fields)))
with pg_executor.pg_conn.cursor() as cur:
cur.execute(f"SELECT * FROM {pg_executor.table} ORDER BY {order_str}")
expected_result = cur.fetchall()
print(f"Expected (via Postgres): {expected_result}")
# Reconnect as Mz disruptions may have destroyed the previous connection
conn = pg8000.connect(
host="localhost",
port=ports["materialized"],
user="materialize",
database="materialize",
)
for executor in executors:
correct_once = False
sleep_time = 0.1
while sleep_time < 60:
conn.autocommit = True
with conn.cursor() as cur:
try:
cur.execute(f"SELECT * FROM {executor.table} ORDER BY {order_str}")
except:
print(f"Comparing against {type(executor).__name__} failed")
raise
actual_result = cur.fetchall()
conn.autocommit = False
if actual_result == expected_result:
if correct_once:
break
print(
"Results match. Check for correctness again to make sure the result is stable"
)
correct_once = True
time.sleep(sleep_time)
continue
else:
print(f"Unexpected ({type(executor).__name__}): {actual_result}")
print(f"Results don't match, sleeping for {sleep_time}s")
time.sleep(sleep_time)
sleep_time *= 2
else:
raise ValueError(f"Unexpected result {actual_result} != {expected_result}") # type: ignore
Functions
def execute_workload(executor_classes: list[typing.Any], workload: Workload, num: int, ports: dict[str, int], runtime: int, verbose: bool) ‑> None
-
Expand source code Browse git
def execute_workload( executor_classes: list[Any], workload: Workload, num: int, ports: dict[str, int], runtime: int, verbose: bool, ) -> None: fields = [] for i in range(random.randint(1, 10)): fields.append(Field(f"key{i}", random.choice(DATA_TYPES_FOR_KEY), True)) for i in range(random.randint(0, 20)): fields.append(Field(f"value{i}", random.choice(DATA_TYPES_FOR_AVRO), False)) print(f"With fields: {fields}") executors = [ executor_class(num, ports, fields, "materialize") for executor_class in [PgExecutor] + executor_classes ] pg_executor = executors[0] start = time.time() run_executors = ([PrintExecutor(ports)] if verbose else []) + executors for exe in run_executors: exe.create() for i, transaction in enumerate(workload.generate(fields)): duration = time.time() - start if duration > runtime: print(f"Ran {i} transactions in {duration} s") assert i > 0 break for executor in run_executors: executor.run(transaction) order_str = ", ".join(str(i + 1) for i in range(len(fields))) with pg_executor.pg_conn.cursor() as cur: cur.execute(f"SELECT * FROM {pg_executor.table} ORDER BY {order_str}") expected_result = cur.fetchall() print(f"Expected (via Postgres): {expected_result}") # Reconnect as Mz disruptions may have destroyed the previous connection conn = pg8000.connect( host="localhost", port=ports["materialized"], user="materialize", database="materialize", ) for executor in executors: correct_once = False sleep_time = 0.1 while sleep_time < 60: conn.autocommit = True with conn.cursor() as cur: try: cur.execute(f"SELECT * FROM {executor.table} ORDER BY {order_str}") except: print(f"Comparing against {type(executor).__name__} failed") raise actual_result = cur.fetchall() conn.autocommit = False if actual_result == expected_result: if correct_once: break print( "Results match. Check for correctness again to make sure the result is stable" ) correct_once = True time.sleep(sleep_time) continue else: print(f"Unexpected ({type(executor).__name__}): {actual_result}") print(f"Results don't match, sleeping for {sleep_time}s") time.sleep(sleep_time) sleep_time *= 2 else: raise ValueError(f"Unexpected result {actual_result} != {expected_result}") # type: ignore
Classes
class DeleteDataAtEndOfDay (composition: Composition | None)
-
Expand source code Browse git
class DeleteDataAtEndOfDay(Workload): def __init__(self, composition: Composition | None) -> None: insert = Insert( count=Records.SOME, record_size=RecordSize.SMALL, ) insert_phase = TransactionDef( size=TransactionSize.HUGE, operations=[insert], ) # Delete all records in a single transaction delete_phase = TransactionDef( [ Delete( number_of_records=Records.ALL, record_size=RecordSize.SMALL, num=insert.max_key(), ) ] ) self.cycle = [ insert_phase, delete_phase, ]
Ancestors
class DeleteDataAtEndOfDayDisruptions (composition: Composition | None)
-
Expand source code Browse git
class DeleteDataAtEndOfDayDisruptions(Workload): def __init__(self, composition: Composition | None) -> None: insert = Insert( count=Records.SOME, record_size=RecordSize.SMALL, ) insert_phase = TransactionDef( size=TransactionSize.HUGE, operations=[insert], ) # Delete all records in a single transaction delete_phase = TransactionDef( [ Delete( number_of_records=Records.ALL, record_size=RecordSize.SMALL, num=insert.max_key(), ) ] ) self.cycle = [ insert_phase, delete_phase, ] if composition: self.cycle.append(RestartMz(composition, probability=0.1))
Ancestors
class SingleSensorUpdating (composition: Composition | None)
-
Expand source code Browse git
class SingleSensorUpdating(Workload): def __init__(self, composition: Composition | None) -> None: self.cycle = [ TransactionDef( [ Upsert( keyspace=Keyspace.SINGLE_VALUE, count=Records.ONE, record_size=RecordSize.SMALL, ) ] ) ]
Ancestors
class SingleSensorUpdatingDisruptions (composition: Composition | None)
-
Expand source code Browse git
class SingleSensorUpdatingDisruptions(Workload): def __init__(self, composition: Composition | None) -> None: self.cycle = [ TransactionDef( [ Upsert( keyspace=Keyspace.SINGLE_VALUE, count=Records.ONE, record_size=RecordSize.SMALL, ), ] ), ] if composition: self.cycle.append(RestartMz(composition, probability=0.1))
Ancestors
class Workload (composition: Composition | None)
-
Expand source code Browse git
class Workload: cycle: list[TransactionDef] def __init__(self, composition: Composition | None) -> None: raise NotImplementedError def generate(self, fields: list[Field]) -> Iterator[Transaction]: while True: for transaction_def in self.cycle: for transaction in transaction_def.generate(fields): if transaction: yield transaction
Subclasses
- DeleteDataAtEndOfDay
- DeleteDataAtEndOfDayDisruptions
- SingleSensorUpdating
- SingleSensorUpdatingDisruptions
Class variables
var cycle : list[TransactionDef]
Methods
def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[Transaction]
-
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[Transaction]: while True: for transaction_def in self.cycle: for transaction in transaction_def.generate(fields): if transaction: yield transaction