Module materialize.scalability.operation_data
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from typing import Any
from psycopg import Cursor
class OperationData:
def __init__(self, cursor: Cursor, worker_id: int):
self._data: dict[str, Any] = dict()
self._data["cursor"] = cursor
self._data["worker_id"] = worker_id
def cursor(self) -> Cursor:
return self._data["cursor"]
def worker_id(self) -> Cursor:
return self._data["worker_id"]
def push(self, key: str, value: Any) -> None:
self._data[key] = value
def remove(self, key: str) -> None:
self._data.pop(key, None)
def get(self, key: str) -> Any:
if key not in self._data.keys():
raise RuntimeError(f"Key does not exist: {key}")
return self._data[key]
def validate_requirements(
self, expected_keys: set[str], required_by: type[Any], requirement: str
) -> None:
for key in expected_keys:
if key not in self._data.keys():
raise RuntimeError(
f"{required_by.__name__} {requirement} '{key}' but got only: {self._data.keys()}"
)
Classes
class OperationData (cursor: psycopg.Cursor, worker_id: int)
-
Expand source code Browse git
class OperationData: def __init__(self, cursor: Cursor, worker_id: int): self._data: dict[str, Any] = dict() self._data["cursor"] = cursor self._data["worker_id"] = worker_id def cursor(self) -> Cursor: return self._data["cursor"] def worker_id(self) -> Cursor: return self._data["worker_id"] def push(self, key: str, value: Any) -> None: self._data[key] = value def remove(self, key: str) -> None: self._data.pop(key, None) def get(self, key: str) -> Any: if key not in self._data.keys(): raise RuntimeError(f"Key does not exist: {key}") return self._data[key] def validate_requirements( self, expected_keys: set[str], required_by: type[Any], requirement: str ) -> None: for key in expected_keys: if key not in self._data.keys(): raise RuntimeError( f"{required_by.__name__} {requirement} '{key}' but got only: {self._data.keys()}" )
Methods
def cursor(self) ‑> psycopg.Cursor
-
Expand source code Browse git
def cursor(self) -> Cursor: return self._data["cursor"]
def get(self, key: str) ‑> Any
-
Expand source code Browse git
def get(self, key: str) -> Any: if key not in self._data.keys(): raise RuntimeError(f"Key does not exist: {key}") return self._data[key]
def push(self, key: str, value: Any) ‑> None
-
Expand source code Browse git
def push(self, key: str, value: Any) -> None: self._data[key] = value
def remove(self, key: str) ‑> None
-
Expand source code Browse git
def remove(self, key: str) -> None: self._data.pop(key, None)
def validate_requirements(self, expected_keys: set[str], required_by: type[typing.Any], requirement: str) ‑> None
-
Expand source code Browse git
def validate_requirements( self, expected_keys: set[str], required_by: type[Any], requirement: str ) -> None: for key in expected_keys: if key not in self._data.keys(): raise RuntimeError( f"{required_by.__name__} {requirement} '{key}' but got only: {self._data.keys()}" )
def worker_id(self) ‑> psycopg.Cursor
-
Expand source code Browse git
def worker_id(self) -> Cursor: return self._data["worker_id"]