Module materialize.scalability.workload

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from psycopg import Cursor

from materialize.scalability.endpoint import Endpoint
from materialize.scalability.operation import Operation
from materialize.scalability.operation_data import OperationData
from materialize.scalability.schema import Schema


class Workload:
    def init_operations(self) -> list[Operation]:
        return []

    def operations(self) -> list[Operation]:
        raise NotImplementedError

    def execute_operation(
        self,
        operation: Operation,
        cursor: Cursor,
        worker_id: int,
        transaction_index: int,
        verbose: bool,
    ) -> None:
        data = OperationData(cursor, worker_id)
        self.amend_data_before_execution(data)

        if verbose:
            print(f"#{transaction_index}: {operation} (worker_id={worker_id})")

        operation.execute(data)

    def amend_data_before_execution(self, data: OperationData) -> None:
        pass

    def name(self) -> str:
        return self.__class__.__name__


class WorkloadWithContext(Workload):
    endpoint: Endpoint
    schema: Schema

    def set_endpoint(self, endpoint: Endpoint) -> None:
        self.endpoint = endpoint

    def set_schema(self, schema: Schema) -> None:
        self.schema = schema

Classes

class Workload
Expand source code Browse git
class Workload:
    def init_operations(self) -> list[Operation]:
        return []

    def operations(self) -> list[Operation]:
        raise NotImplementedError

    def execute_operation(
        self,
        operation: Operation,
        cursor: Cursor,
        worker_id: int,
        transaction_index: int,
        verbose: bool,
    ) -> None:
        data = OperationData(cursor, worker_id)
        self.amend_data_before_execution(data)

        if verbose:
            print(f"#{transaction_index}: {operation} (worker_id={worker_id})")

        operation.execute(data)

    def amend_data_before_execution(self, data: OperationData) -> None:
        pass

    def name(self) -> str:
        return self.__class__.__name__

Subclasses

Methods

def amend_data_before_execution(self, data: OperationData) ‑> None
Expand source code Browse git
def amend_data_before_execution(self, data: OperationData) -> None:
    pass
def execute_operation(self, operation: Operation, cursor: psycopg.Cursor, worker_id: int, transaction_index: int, verbose: bool) ‑> None
Expand source code Browse git
def execute_operation(
    self,
    operation: Operation,
    cursor: Cursor,
    worker_id: int,
    transaction_index: int,
    verbose: bool,
) -> None:
    data = OperationData(cursor, worker_id)
    self.amend_data_before_execution(data)

    if verbose:
        print(f"#{transaction_index}: {operation} (worker_id={worker_id})")

    operation.execute(data)
def init_operations(self) ‑> list[Operation]
Expand source code Browse git
def init_operations(self) -> list[Operation]:
    return []
def name(self) ‑> str
Expand source code Browse git
def name(self) -> str:
    return self.__class__.__name__
def operations(self) ‑> list[Operation]
Expand source code Browse git
def operations(self) -> list[Operation]:
    raise NotImplementedError
class WorkloadWithContext
Expand source code Browse git
class WorkloadWithContext(Workload):
    endpoint: Endpoint
    schema: Schema

    def set_endpoint(self, endpoint: Endpoint) -> None:
        self.endpoint = endpoint

    def set_schema(self, schema: Schema) -> None:
        self.schema = schema

Ancestors

Subclasses

Class variables

var endpointEndpoint
var schemaSchema

Methods

def set_endpoint(self, endpoint: Endpoint) ‑> None
Expand source code Browse git
def set_endpoint(self, endpoint: Endpoint) -> None:
    self.endpoint = endpoint
def set_schema(self, schema: Schema) ‑> None
Expand source code Browse git
def set_schema(self, schema: Schema) -> None:
    self.schema = schema