Module materialize.scalability.schema

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from enum import Enum, auto

class Source(Enum):
    TABLE = auto()

class TransactionIsolation(Enum):
    SERIALIZABLE = "serializable"
    STRICT_SERIALIZABLE = "strict serializable"

    def __str__(self) -> str:
        return self.value

class Schema:
    def __init__(
        source: Source = Source.TABLE,
        schema: str = "scalability",
        create_index: bool = True,
        transaction_isolation: TransactionIsolation | None = None,
        cluster_name: str | None = None,
        object_count: int = 1,
    ) -> None:
        self.schema = schema
        self.source = source
        self.create_index = create_index
        self.transaction_isolation = transaction_isolation
        self.cluster_name = cluster_name
        self.object_count = object_count

    def init_sqls(self) -> list[str]:
        init_sqls = self.connect_sqls() + [
            f"DROP SCHEMA IF EXISTS {self.schema} CASCADE;",
            f"CREATE SCHEMA {self.schema};",
            "DROP TABLE IF EXISTS t1;",
        if self.source == Source.TABLE:
            for t in range(1, self.object_count + 1):
                        f"CREATE TABLE t{t} (f1 INTEGER DEFAULT 1);",
                        f"INSERT INTO t{t} DEFAULT VALUES;",
                        f"CREATE OR REPLACE MATERIALIZED VIEW mv{t} AS SELECT count(*) AS count FROM t{t};",

                # Create indexes and wait until they are queryable. Compute can
                # delay execution of dataflows when inputs are not yet
                # available. This would lead to a large outlier on the first
                # query of the actual workload, when it has to wait for the
                # index to be ready.
                if self.create_index:
                    init_sqls.append(f"CREATE INDEX i{t} ON t{t} (f1);")
                    init_sqls.append(f"CREATE INDEX mv_i{t} ON mv{t} (count);")
                    init_sqls.append(f"SELECT f1 from t{t};")
                    init_sqls.append(f"SELECT count from mv{t};")

        return init_sqls

    def connect_sqls(self) -> list[str]:
        init_sqls = [f"SET SCHEMA = {self.schema};"]
        if self.cluster_name is not None:
            init_sqls.append(f"SET CLUSTER = {self.cluster_name};")

        if self.transaction_isolation is not None:
                f"SET transaction_isolation = '{self.transaction_isolation}';"

        return init_sqls


class Schema (source: Source = Source.TABLE, schema: str = 'scalability', create_index: bool = True, transaction_isolation: TransactionIsolation | None = None, cluster_name: str | None = None, object_count: int = 1)
Expand source code Browse git
class Schema:
    def __init__(
        source: Source = Source.TABLE,
        schema: str = "scalability",
        create_index: bool = True,
        transaction_isolation: TransactionIsolation | None = None,
        cluster_name: str | None = None,
        object_count: int = 1,
    ) -> None:
        self.schema = schema
        self.source = source
        self.create_index = create_index
        self.transaction_isolation = transaction_isolation
        self.cluster_name = cluster_name
        self.object_count = object_count

    def init_sqls(self) -> list[str]:
        init_sqls = self.connect_sqls() + [
            f"DROP SCHEMA IF EXISTS {self.schema} CASCADE;",
            f"CREATE SCHEMA {self.schema};",
            "DROP TABLE IF EXISTS t1;",
        if self.source == Source.TABLE:
            for t in range(1, self.object_count + 1):
                        f"CREATE TABLE t{t} (f1 INTEGER DEFAULT 1);",
                        f"INSERT INTO t{t} DEFAULT VALUES;",
                        f"CREATE OR REPLACE MATERIALIZED VIEW mv{t} AS SELECT count(*) AS count FROM t{t};",

                # Create indexes and wait until they are queryable. Compute can
                # delay execution of dataflows when inputs are not yet
                # available. This would lead to a large outlier on the first
                # query of the actual workload, when it has to wait for the
                # index to be ready.
                if self.create_index:
                    init_sqls.append(f"CREATE INDEX i{t} ON t{t} (f1);")
                    init_sqls.append(f"CREATE INDEX mv_i{t} ON mv{t} (count);")
                    init_sqls.append(f"SELECT f1 from t{t};")
                    init_sqls.append(f"SELECT count from mv{t};")

        return init_sqls

    def connect_sqls(self) -> list[str]:
        init_sqls = [f"SET SCHEMA = {self.schema};"]
        if self.cluster_name is not None:
            init_sqls.append(f"SET CLUSTER = {self.cluster_name};")

        if self.transaction_isolation is not None:
                f"SET transaction_isolation = '{self.transaction_isolation}';"

        return init_sqls


def connect_sqls(self) ‑> list[str]
Expand source code Browse git
def connect_sqls(self) -> list[str]:
    init_sqls = [f"SET SCHEMA = {self.schema};"]
    if self.cluster_name is not None:
        init_sqls.append(f"SET CLUSTER = {self.cluster_name};")

    if self.transaction_isolation is not None:
            f"SET transaction_isolation = '{self.transaction_isolation}';"

    return init_sqls
def init_sqls(self) ‑> list[str]
Expand source code Browse git
def init_sqls(self) -> list[str]:
    init_sqls = self.connect_sqls() + [
        f"DROP SCHEMA IF EXISTS {self.schema} CASCADE;",
        f"CREATE SCHEMA {self.schema};",
        "DROP TABLE IF EXISTS t1;",
    if self.source == Source.TABLE:
        for t in range(1, self.object_count + 1):
                    f"CREATE TABLE t{t} (f1 INTEGER DEFAULT 1);",
                    f"INSERT INTO t{t} DEFAULT VALUES;",
                    f"CREATE OR REPLACE MATERIALIZED VIEW mv{t} AS SELECT count(*) AS count FROM t{t};",

            # Create indexes and wait until they are queryable. Compute can
            # delay execution of dataflows when inputs are not yet
            # available. This would lead to a large outlier on the first
            # query of the actual workload, when it has to wait for the
            # index to be ready.
            if self.create_index:
                init_sqls.append(f"CREATE INDEX i{t} ON t{t} (f1);")
                init_sqls.append(f"CREATE INDEX mv_i{t} ON mv{t} (count);")
                init_sqls.append(f"SELECT f1 from t{t};")
                init_sqls.append(f"SELECT count from mv{t};")

    return init_sqls
class Source (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class Source(Enum):
    TABLE = auto()


  • enum.Enum

Class variables

class TransactionIsolation (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class TransactionIsolation(Enum):
    SERIALIZABLE = "serializable"
    STRICT_SERIALIZABLE = "strict serializable"

    def __str__(self) -> str:
        return self.value


  • enum.Enum

Class variables