Module materialize.scalability.workload
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from psycopg import Cursor
from materialize.scalability.endpoint import Endpoint
from materialize.scalability.operation import Operation
from materialize.scalability.operation_data import OperationData
from materialize.scalability.schema import Schema
class Workload:
def init_operations(self) -> list[Operation]:
return []
def operations(self) -> list[Operation]:
raise NotImplementedError
def execute_operation(
self,
operation: Operation,
cursor: Cursor,
worker_id: int,
transaction_index: int,
verbose: bool,
) -> None:
data = OperationData(cursor, worker_id)
self.amend_data_before_execution(data)
if verbose:
print(f"#{transaction_index}: {operation} (worker_id={worker_id})")
operation.execute(data)
def amend_data_before_execution(self, data: OperationData) -> None:
pass
def name(self) -> str:
return self.__class__.__name__
class WorkloadWithContext(Workload):
endpoint: Endpoint
schema: Schema
def set_endpoint(self, endpoint: Endpoint) -> None:
self.endpoint = endpoint
def set_schema(self, schema: Schema) -> None:
self.schema = schema
Classes
class Workload
-
Expand source code Browse git
class Workload: def init_operations(self) -> list[Operation]: return [] def operations(self) -> list[Operation]: raise NotImplementedError def execute_operation( self, operation: Operation, cursor: Cursor, worker_id: int, transaction_index: int, verbose: bool, ) -> None: data = OperationData(cursor, worker_id) self.amend_data_before_execution(data) if verbose: print(f"#{transaction_index}: {operation} (worker_id={worker_id})") operation.execute(data) def amend_data_before_execution(self, data: OperationData) -> None: pass def name(self) -> str: return self.__class__.__name__
Subclasses
Methods
def amend_data_before_execution(self, data: OperationData) ‑> None
-
Expand source code Browse git
def amend_data_before_execution(self, data: OperationData) -> None: pass
def execute_operation(self, operation: Operation, cursor: psycopg.Cursor, worker_id: int, transaction_index: int, verbose: bool) ‑> None
-
Expand source code Browse git
def execute_operation( self, operation: Operation, cursor: Cursor, worker_id: int, transaction_index: int, verbose: bool, ) -> None: data = OperationData(cursor, worker_id) self.amend_data_before_execution(data) if verbose: print(f"#{transaction_index}: {operation} (worker_id={worker_id})") operation.execute(data)
def init_operations(self) ‑> list[Operation]
-
Expand source code Browse git
def init_operations(self) -> list[Operation]: return []
def name(self) ‑> str
-
Expand source code Browse git
def name(self) -> str: return self.__class__.__name__
def operations(self) ‑> list[Operation]
-
Expand source code Browse git
def operations(self) -> list[Operation]: raise NotImplementedError
class WorkloadWithContext
-
Expand source code Browse git
class WorkloadWithContext(Workload): endpoint: Endpoint schema: Schema def set_endpoint(self, endpoint: Endpoint) -> None: self.endpoint = endpoint def set_schema(self, schema: Schema) -> None: self.schema = schema
Ancestors
Subclasses
Class variables
var endpoint : Endpoint
var schema : Schema
Methods
def set_endpoint(self, endpoint: Endpoint) ‑> None
-
Expand source code Browse git
def set_endpoint(self, endpoint: Endpoint) -> None: self.endpoint = endpoint
def set_schema(self, schema: Schema) ‑> None
-
Expand source code Browse git
def set_schema(self, schema: Schema) -> None: self.schema = schema