Module materialize.scalability.operations

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from psycopg import Connection

from materialize.scalability.endpoint import Endpoint
from materialize.scalability.operation import (
    Operation,
    SimpleSqlOperation,
    SqlOperationWithSeed,
    SqlOperationWithTwoSeeds,
)
from materialize.scalability.operation_data import OperationData
from materialize.scalability.schema import Schema


class InsertDefaultValues(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "INSERT INTO t1 DEFAULT VALUES;"


class SelectOne(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT 1;"


class SelectStar(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT * FROM t1;"


class SelectLimit(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT * FROM t1 LIMIT 1;"


class SelectCount(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT COUNT(*) FROM t1;"


class SelectCountInMv(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT count FROM mv1;"


class SelectUnionAll(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT * FROM t1 UNION ALL SELECT * FROM t1;"


class Update(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "UPDATE t1 SET f1 = f1 + 1;"


class CreateTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"CREATE TABLE x_{table_seed} (f1 INT, f2 INT, f3 INT, f4 INT, f5 INT);"


class CreateIndexOnTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"CREATE INDEX i_x_{table_seed} ON x_{table_seed} (f1);"


class CreateMvOnTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"CREATE MATERIALIZED VIEW mv_x_{table_seed} AS SELECT * FROM x_{table_seed};"


class PopulateTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"INSERT INTO x_{table_seed} SELECT generate_series(1, 100), 200, 300, 400, 500;"


class FillColumnInTableX(SqlOperationWithTwoSeeds):
    def __init__(self, column_seed_key: str = "column_seed") -> None:
        super().__init__("table_seed", column_seed_key)

    def sql_statement(self, table_seed: str, column_seed: str) -> str:
        return f"UPDATE x_{table_seed} SET c_{column_seed} = f1;"


class DropMvOfTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"DROP MATERIALIZED VIEW mv_x_{table_seed} CASCADE;"


class DropTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"DROP TABLE x_{table_seed} CASCADE;"


class SelectStarFromTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"SELECT * FROM x_{table_seed};"


class SelectStarFromMvOnTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"SELECT * FROM mv_x_{table_seed};"


class Connect(Operation):
    def required_keys(self) -> set[str]:
        return {"endpoint", "schema"}

    def produced_keys(self) -> set[str]:
        return {"connection", "cursor"}

    def _execute(self, data: OperationData) -> OperationData:
        endpoint: Endpoint = data.get("endpoint")
        schema: Schema = data.get("schema")

        connection = endpoint.sql_connection(quiet=True)
        connection.autocommit = True
        cursor = connection.cursor()

        # this sets the database schema
        for connect_sql in schema.connect_sqls():
            cursor.execute(connect_sql.encode("utf8"))

        data.push("connection", connection)
        data.push("cursor", cursor)
        return data


class Disconnect(Operation):
    def required_keys(self) -> set[str]:
        return {"connection"}

    def _execute(self, data: OperationData) -> OperationData:
        connection: Connection = data.get("connection")
        connection.close()
        return data

Classes

class Connect
Expand source code Browse git
class Connect(Operation):
    def required_keys(self) -> set[str]:
        return {"endpoint", "schema"}

    def produced_keys(self) -> set[str]:
        return {"connection", "cursor"}

    def _execute(self, data: OperationData) -> OperationData:
        endpoint: Endpoint = data.get("endpoint")
        schema: Schema = data.get("schema")

        connection = endpoint.sql_connection(quiet=True)
        connection.autocommit = True
        cursor = connection.cursor()

        # this sets the database schema
        for connect_sql in schema.connect_sqls():
            cursor.execute(connect_sql.encode("utf8"))

        data.push("connection", connection)
        data.push("cursor", cursor)
        return data

Ancestors

Inherited members

class CreateIndexOnTableX
Expand source code Browse git
class CreateIndexOnTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"CREATE INDEX i_x_{table_seed} ON x_{table_seed} (f1);"

Ancestors

Methods

def sql_statement(self, table_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str) -> str:
    return f"CREATE INDEX i_x_{table_seed} ON x_{table_seed} (f1);"

Inherited members

class CreateMvOnTableX
Expand source code Browse git
class CreateMvOnTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"CREATE MATERIALIZED VIEW mv_x_{table_seed} AS SELECT * FROM x_{table_seed};"

Ancestors

Methods

def sql_statement(self, table_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str) -> str:
    return f"CREATE MATERIALIZED VIEW mv_x_{table_seed} AS SELECT * FROM x_{table_seed};"

Inherited members

class CreateTableX
Expand source code Browse git
class CreateTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"CREATE TABLE x_{table_seed} (f1 INT, f2 INT, f3 INT, f4 INT, f5 INT);"

Ancestors

Methods

def sql_statement(self, table_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str) -> str:
    return f"CREATE TABLE x_{table_seed} (f1 INT, f2 INT, f3 INT, f4 INT, f5 INT);"

Inherited members

class Disconnect
Expand source code Browse git
class Disconnect(Operation):
    def required_keys(self) -> set[str]:
        return {"connection"}

    def _execute(self, data: OperationData) -> OperationData:
        connection: Connection = data.get("connection")
        connection.close()
        return data

Ancestors

Inherited members

class DropMvOfTableX
Expand source code Browse git
class DropMvOfTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"DROP MATERIALIZED VIEW mv_x_{table_seed} CASCADE;"

Ancestors

Methods

def sql_statement(self, table_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str) -> str:
    return f"DROP MATERIALIZED VIEW mv_x_{table_seed} CASCADE;"

Inherited members

class DropTableX
Expand source code Browse git
class DropTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"DROP TABLE x_{table_seed} CASCADE;"

Ancestors

Methods

def sql_statement(self, table_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str) -> str:
    return f"DROP TABLE x_{table_seed} CASCADE;"

Inherited members

class FillColumnInTableX (column_seed_key: str = 'column_seed')
Expand source code Browse git
class FillColumnInTableX(SqlOperationWithTwoSeeds):
    def __init__(self, column_seed_key: str = "column_seed") -> None:
        super().__init__("table_seed", column_seed_key)

    def sql_statement(self, table_seed: str, column_seed: str) -> str:
        return f"UPDATE x_{table_seed} SET c_{column_seed} = f1;"

Ancestors

Methods

def sql_statement(self, table_seed: str, column_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str, column_seed: str) -> str:
    return f"UPDATE x_{table_seed} SET c_{column_seed} = f1;"

Inherited members

class InsertDefaultValues
Expand source code Browse git
class InsertDefaultValues(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "INSERT INTO t1 DEFAULT VALUES;"

Ancestors

Methods

def sql_statement(self) ‑> str
Expand source code Browse git
def sql_statement(self) -> str:
    return "INSERT INTO t1 DEFAULT VALUES;"

Inherited members

class PopulateTableX
Expand source code Browse git
class PopulateTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"INSERT INTO x_{table_seed} SELECT generate_series(1, 100), 200, 300, 400, 500;"

Ancestors

Methods

def sql_statement(self, table_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str) -> str:
    return f"INSERT INTO x_{table_seed} SELECT generate_series(1, 100), 200, 300, 400, 500;"

Inherited members

class SelectCount
Expand source code Browse git
class SelectCount(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT COUNT(*) FROM t1;"

Ancestors

Methods

def sql_statement(self) ‑> str
Expand source code Browse git
def sql_statement(self) -> str:
    return "SELECT COUNT(*) FROM t1;"

Inherited members

class SelectCountInMv
Expand source code Browse git
class SelectCountInMv(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT count FROM mv1;"

Ancestors

Methods

def sql_statement(self) ‑> str
Expand source code Browse git
def sql_statement(self) -> str:
    return "SELECT count FROM mv1;"

Inherited members

class SelectLimit
Expand source code Browse git
class SelectLimit(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT * FROM t1 LIMIT 1;"

Ancestors

Methods

def sql_statement(self) ‑> str
Expand source code Browse git
def sql_statement(self) -> str:
    return "SELECT * FROM t1 LIMIT 1;"

Inherited members

class SelectOne
Expand source code Browse git
class SelectOne(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT 1;"

Ancestors

Methods

def sql_statement(self) ‑> str
Expand source code Browse git
def sql_statement(self) -> str:
    return "SELECT 1;"

Inherited members

class SelectStar
Expand source code Browse git
class SelectStar(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT * FROM t1;"

Ancestors

Methods

def sql_statement(self) ‑> str
Expand source code Browse git
def sql_statement(self) -> str:
    return "SELECT * FROM t1;"

Inherited members

class SelectStarFromMvOnTableX
Expand source code Browse git
class SelectStarFromMvOnTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"SELECT * FROM mv_x_{table_seed};"

Ancestors

Methods

def sql_statement(self, table_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str) -> str:
    return f"SELECT * FROM mv_x_{table_seed};"

Inherited members

class SelectStarFromTableX
Expand source code Browse git
class SelectStarFromTableX(SqlOperationWithSeed):
    def __init__(self) -> None:
        super().__init__("table_seed")

    def sql_statement(self, table_seed: str) -> str:
        return f"SELECT * FROM x_{table_seed};"

Ancestors

Methods

def sql_statement(self, table_seed: str) ‑> str
Expand source code Browse git
def sql_statement(self, table_seed: str) -> str:
    return f"SELECT * FROM x_{table_seed};"

Inherited members

class SelectUnionAll
Expand source code Browse git
class SelectUnionAll(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "SELECT * FROM t1 UNION ALL SELECT * FROM t1;"

Ancestors

Methods

def sql_statement(self) ‑> str
Expand source code Browse git
def sql_statement(self) -> str:
    return "SELECT * FROM t1 UNION ALL SELECT * FROM t1;"

Inherited members

class Update
Expand source code Browse git
class Update(SimpleSqlOperation):
    def sql_statement(self) -> str:
        return "UPDATE t1 SET f1 = f1 + 1;"

Ancestors

Methods

def sql_statement(self) ‑> str
Expand source code Browse git
def sql_statement(self) -> str:
    return "UPDATE t1 SET f1 = f1 + 1;"

Inherited members