Module materialize.optbench.sql

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import logging
import re
import ssl
from enum import Enum
from pathlib import Path
from typing import Any, cast

import numpy as np
import pg8000
import sqlparse

from . import Scenario, util

class Dialect(Enum):
    PG = 0
    MZ = 1

class Query:
    """An API for manipulating workload queries."""

    def __init__(self, query: str) -> None:
        self.query = query

    def __str__(self) -> str:
        return self.query

    def name(self) -> str:
        """Extracts and returns the name of this query from a '-- name: {name}' comment.
        Returns 'anonymous' if the name is not set."""
        p = r"-- name\: (?P<name>.+)"
        m =, self.query, re.MULTILINE)
        return"name") if m else "anonoymous"

    def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str:
        """Prepends 'EXPLAIN ...' to the query respecting the given dialect."""

        if dialect == Dialect.PG:
            if timing:
                return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query])
                return "\n".join(["EXPLAIN", self.query])
            if timing:
                return "\n".join(["EXPLAIN WITH(timing)", self.query])
                return "\n".join(["EXPLAIN", self.query])

class ExplainOutput:
    """An API for manipulating 'EXPLAIN ... PLAN FOR' results."""

    def __init__(self, output: str) -> None:
        self.output = output

    def __str__(self) -> str:
        return self.output

    def optimization_time(self) -> np.timedelta64 | None:
        """Optionally, returns the optimization_time time for an 'EXPLAIN' output."""
        p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)"
        m =, self.output, re.MULTILINE)
        return util.duration_to_timedelta(m["time"]) if m else None

class Database:
    """An API to the database under test."""

    def __init__(
        port: int,
        host: str,
        user: str,
        password: str | None,
        database: str | None,
        require_ssl: bool,
    ) -> None:
        logging.debug(f"Initialize Database with host={host} port={port}, user={user}")

        if require_ssl:
            # verify_mode=ssl.CERT_REQUIRED is the default
            ssl_context = ssl.create_default_context()
            ssl_context = None

        self.conn = pg8000.connect(
        self.conn.autocommit = True
        self.dialect = Dialect.MZ if "Materialize" in self.version() else Dialect.PG

    def close(self) -> None:

    def version(self) -> str:
        result = self.query_one("SELECT version()")
        return cast(str, result[0])

    def mz_version(self) -> str | None:
        if self.dialect == Dialect.MZ:
            result = self.query_one("SELECT mz_version()")
            return cast(str, result[0])
            return None

    def drop_database(self, scenario: Scenario) -> None:
        logging.debug(f'Drop database "{scenario}"')
        self.execute(f"DROP DATABASE IF EXISTS {scenario}")

    def create_database(self, scenario: Scenario) -> None:
        logging.debug(f'Create database "{scenario}"')
        self.execute(f"CREATE DATABASE {scenario}")

    def explain(self, query: Query, timing: bool) -> "ExplainOutput":
        result = self.query_all(query.explain(timing, self.dialect))
        return ExplainOutput("\n".join([col for line in result for col in line]))

    def execute(self, statement: str) -> None:
        with self.conn.cursor() as cursor:

    def execute_all(self, statements: list[str]) -> None:
        with self.conn.cursor() as cursor:
            for statement in statements:

    def query_one(self, query: str) -> dict[Any, Any]:
        with self.conn.cursor() as cursor:
            return cast(dict[Any, Any], cursor.fetchone())

    def query_all(self, query: str) -> dict[Any, Any]:
        with self.conn.cursor() as cursor:
            return cast(dict[Any, Any], cursor.fetchall())

# Utility functions
# -----------------

def parse_from_file(path: Path) -> list[str]:
    """Parses a *.sql file to a list of queries."""
    return sqlparse.split(path.read_text())


def parse_from_file(path: pathlib.Path) ‑> list[str]

Parses a *.sql file to a list of queries.

Expand source code Browse git
def parse_from_file(path: Path) -> list[str]:
    """Parses a *.sql file to a list of queries."""
    return sqlparse.split(path.read_text())


class Database (port: int, host: str, user: str, password: str | None, database: str | None, require_ssl: bool)

An API to the database under test.

Expand source code Browse git
class Database:
    """An API to the database under test."""

    def __init__(
        port: int,
        host: str,
        user: str,
        password: str | None,
        database: str | None,
        require_ssl: bool,
    ) -> None:
        logging.debug(f"Initialize Database with host={host} port={port}, user={user}")

        if require_ssl:
            # verify_mode=ssl.CERT_REQUIRED is the default
            ssl_context = ssl.create_default_context()
            ssl_context = None

        self.conn = pg8000.connect(
        self.conn.autocommit = True
        self.dialect = Dialect.MZ if "Materialize" in self.version() else Dialect.PG

    def close(self) -> None:

    def version(self) -> str:
        result = self.query_one("SELECT version()")
        return cast(str, result[0])

    def mz_version(self) -> str | None:
        if self.dialect == Dialect.MZ:
            result = self.query_one("SELECT mz_version()")
            return cast(str, result[0])
            return None

    def drop_database(self, scenario: Scenario) -> None:
        logging.debug(f'Drop database "{scenario}"')
        self.execute(f"DROP DATABASE IF EXISTS {scenario}")

    def create_database(self, scenario: Scenario) -> None:
        logging.debug(f'Create database "{scenario}"')
        self.execute(f"CREATE DATABASE {scenario}")

    def explain(self, query: Query, timing: bool) -> "ExplainOutput":
        result = self.query_all(query.explain(timing, self.dialect))
        return ExplainOutput("\n".join([col for line in result for col in line]))

    def execute(self, statement: str) -> None:
        with self.conn.cursor() as cursor:

    def execute_all(self, statements: list[str]) -> None:
        with self.conn.cursor() as cursor:
            for statement in statements:

    def query_one(self, query: str) -> dict[Any, Any]:
        with self.conn.cursor() as cursor:
            return cast(dict[Any, Any], cursor.fetchone())

    def query_all(self, query: str) -> dict[Any, Any]:
        with self.conn.cursor() as cursor:
            return cast(dict[Any, Any], cursor.fetchall())


def close(self) ‑> None
Expand source code Browse git
def close(self) -> None:
def create_database(self, scenario: Scenario) ‑> None
Expand source code Browse git
def create_database(self, scenario: Scenario) -> None:
    logging.debug(f'Create database "{scenario}"')
    self.execute(f"CREATE DATABASE {scenario}")
def drop_database(self, scenario: Scenario) ‑> None
Expand source code Browse git
def drop_database(self, scenario: Scenario) -> None:
    logging.debug(f'Drop database "{scenario}"')
    self.execute(f"DROP DATABASE IF EXISTS {scenario}")
def execute(self, statement: str) ‑> None
Expand source code Browse git
def execute(self, statement: str) -> None:
    with self.conn.cursor() as cursor:
def execute_all(self, statements: list[str]) ‑> None
Expand source code Browse git
def execute_all(self, statements: list[str]) -> None:
    with self.conn.cursor() as cursor:
        for statement in statements:
def explain(self, query: Query, timing: bool) ‑> ExplainOutput
Expand source code Browse git
def explain(self, query: Query, timing: bool) -> "ExplainOutput":
    result = self.query_all(query.explain(timing, self.dialect))
    return ExplainOutput("\n".join([col for line in result for col in line]))
def mz_version(self) ‑> str | None
Expand source code Browse git
def mz_version(self) -> str | None:
    if self.dialect == Dialect.MZ:
        result = self.query_one("SELECT mz_version()")
        return cast(str, result[0])
        return None
def query_all(self, query: str) ‑> dict[typing.Any, typing.Any]
Expand source code Browse git
def query_all(self, query: str) -> dict[Any, Any]:
    with self.conn.cursor() as cursor:
        return cast(dict[Any, Any], cursor.fetchall())
def query_one(self, query: str) ‑> dict[typing.Any, typing.Any]
Expand source code Browse git
def query_one(self, query: str) -> dict[Any, Any]:
    with self.conn.cursor() as cursor:
        return cast(dict[Any, Any], cursor.fetchone())
def version(self) ‑> str
Expand source code Browse git
def version(self) -> str:
    result = self.query_one("SELECT version()")
    return cast(str, result[0])
class Dialect (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class Dialect(Enum):
    PG = 0
    MZ = 1


  • enum.Enum

Class variables

var MZ
var PG
class ExplainOutput (output: str)

An API for manipulating 'EXPLAIN … PLAN FOR' results.

Expand source code Browse git
class ExplainOutput:
    """An API for manipulating 'EXPLAIN ... PLAN FOR' results."""

    def __init__(self, output: str) -> None:
        self.output = output

    def __str__(self) -> str:
        return self.output

    def optimization_time(self) -> np.timedelta64 | None:
        """Optionally, returns the optimization_time time for an 'EXPLAIN' output."""
        p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)"
        m =, self.output, re.MULTILINE)
        return util.duration_to_timedelta(m["time"]) if m else None


def optimization_time(self) ‑> numpy.timedelta64 | None

Optionally, returns the optimization_time time for an 'EXPLAIN' output.

Expand source code Browse git
def optimization_time(self) -> np.timedelta64 | None:
    """Optionally, returns the optimization_time time for an 'EXPLAIN' output."""
    p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)"
    m =, self.output, re.MULTILINE)
    return util.duration_to_timedelta(m["time"]) if m else None
class Query (query: str)

An API for manipulating workload queries.

Expand source code Browse git
class Query:
    """An API for manipulating workload queries."""

    def __init__(self, query: str) -> None:
        self.query = query

    def __str__(self) -> str:
        return self.query

    def name(self) -> str:
        """Extracts and returns the name of this query from a '-- name: {name}' comment.
        Returns 'anonymous' if the name is not set."""
        p = r"-- name\: (?P<name>.+)"
        m =, self.query, re.MULTILINE)
        return"name") if m else "anonoymous"

    def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str:
        """Prepends 'EXPLAIN ...' to the query respecting the given dialect."""

        if dialect == Dialect.PG:
            if timing:
                return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query])
                return "\n".join(["EXPLAIN", self.query])
            if timing:
                return "\n".join(["EXPLAIN WITH(timing)", self.query])
                return "\n".join(["EXPLAIN", self.query])


def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) ‑> str

Prepends 'EXPLAIN …' to the query respecting the given dialect.

Expand source code Browse git
def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str:
    """Prepends 'EXPLAIN ...' to the query respecting the given dialect."""

    if dialect == Dialect.PG:
        if timing:
            return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query])
            return "\n".join(["EXPLAIN", self.query])
        if timing:
            return "\n".join(["EXPLAIN WITH(timing)", self.query])
            return "\n".join(["EXPLAIN", self.query])
def name(self) ‑> str

Extracts and returns the name of this query from a '– name: {name}' comment. Returns 'anonymous' if the name is not set.

Expand source code Browse git
def name(self) -> str:
    """Extracts and returns the name of this query from a '-- name: {name}' comment.
    Returns 'anonymous' if the name is not set."""
    p = r"-- name\: (?P<name>.+)"
    m =, self.query, re.MULTILINE)
    return"name") if m else "anonoymous"