misc.python.materialize.optbench.sql

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10import logging
 11import re
 12from enum import Enum
 13from pathlib import Path
 14from typing import Any, cast
 15
 16import numpy as np
 17import psycopg
 18import sqlparse
 19
 20from . import Scenario, util
 21
 22
 23class Dialect(Enum):
 24    PG = 0
 25    MZ = 1
 26
 27
 28class Query:
 29    """An API for manipulating workload queries."""
 30
 31    def __init__(self, query: str) -> None:
 32        self.query = query
 33
 34    def __str__(self) -> str:
 35        return self.query
 36
 37    def name(self) -> str:
 38        """Extracts and returns the name of this query from a '-- name: {name}' comment.
 39        Returns 'anonymous' if the name is not set."""
 40        p = r"-- name\: (?P<name>.+)"
 41        m = re.search(p, self.query, re.MULTILINE)
 42        return m.group("name") if m else "anonoymous"
 43
 44    def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str:
 45        """Prepends 'EXPLAIN ...' to the query respecting the given dialect."""
 46
 47        if dialect == Dialect.PG:
 48            if timing:
 49                return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query])
 50            else:
 51                return "\n".join(["EXPLAIN", self.query])
 52        else:
 53            if timing:
 54                return "\n".join(["EXPLAIN WITH(timing)", self.query])
 55            else:
 56                return "\n".join(["EXPLAIN", self.query])
 57
 58
 59class ExplainOutput:
 60    """An API for manipulating 'EXPLAIN ... PLAN FOR' results."""
 61
 62    def __init__(self, output: str) -> None:
 63        self.output = output
 64
 65    def __str__(self) -> str:
 66        return self.output
 67
 68    def optimization_time(self) -> np.timedelta64 | None:
 69        """Optionally, returns the optimization_time time for an 'EXPLAIN' output."""
 70        p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)"
 71        m = re.search(p, self.output, re.MULTILINE)
 72        return util.duration_to_timedelta(m["time"]) if m else None
 73
 74
 75class Database:
 76    """An API to the database under test."""
 77
 78    def __init__(
 79        self,
 80        port: int,
 81        host: str,
 82        user: str,
 83        password: str | None,
 84        database: str | None,
 85        require_ssl: bool,
 86    ) -> None:
 87        logging.debug(f"Initialize Database with host={host} port={port}, user={user}")
 88
 89        self.conn = psycopg.connect(
 90            host=host,
 91            port=port,
 92            user=user,
 93            password=password,
 94            dbname=database,
 95            sslmode="require" if require_ssl else "disable",
 96        )
 97        self.conn.autocommit = True
 98        self.dialect = Dialect.MZ if "Materialize" in self.version() else Dialect.PG
 99
100    def close(self) -> None:
101        self.conn.close()
102
103    def version(self) -> str:
104        result = self.query_one("SELECT version()")
105        return cast(str, result[0])
106
107    def mz_version(self) -> str | None:
108        if self.dialect == Dialect.MZ:
109            result = self.query_one("SELECT mz_version()")
110            return cast(str, result[0])
111        else:
112            return None
113
114    def drop_database(self, scenario: Scenario) -> None:
115        logging.debug(f'Drop database "{scenario}"')
116        self.execute(f"DROP DATABASE IF EXISTS {scenario}")
117
118    def create_database(self, scenario: Scenario) -> None:
119        logging.debug(f'Create database "{scenario}"')
120        self.execute(f"CREATE DATABASE {scenario}")
121
122    def explain(self, query: Query, timing: bool) -> "ExplainOutput":
123        result = self.query_all(query.explain(timing, self.dialect))
124        return ExplainOutput("\n".join([col for line in result for col in line]))
125
126    def execute(self, statement: str) -> None:
127        with self.conn.cursor() as cursor:
128            cursor.execute(statement.encode())
129
130    def execute_all(self, statements: list[str]) -> None:
131        with self.conn.cursor() as cursor:
132            for statement in statements:
133                cursor.execute(statement.encode())
134
135    def query_one(self, query: str) -> dict[Any, Any]:
136        with self.conn.cursor() as cursor:
137            cursor.execute(query.encode())
138            return cast(dict[Any, Any], cursor.fetchone())
139
140    def query_all(self, query: str) -> dict[Any, Any]:
141        with self.conn.cursor() as cursor:
142            cursor.execute(query.encode())
143            return cast(dict[Any, Any], cursor.fetchall())
144
145
146# Utility functions
147# -----------------
148
149
150def parse_from_file(path: Path) -> list[str]:
151    """Parses a *.sql file to a list of queries."""
152    return sqlparse.split(path.read_text())
class Dialect(enum.Enum):
24class Dialect(Enum):
25    PG = 0
26    MZ = 1
PG = <Dialect.PG: 0>
MZ = <Dialect.MZ: 1>
class Query:
29class Query:
30    """An API for manipulating workload queries."""
31
32    def __init__(self, query: str) -> None:
33        self.query = query
34
35    def __str__(self) -> str:
36        return self.query
37
38    def name(self) -> str:
39        """Extracts and returns the name of this query from a '-- name: {name}' comment.
40        Returns 'anonymous' if the name is not set."""
41        p = r"-- name\: (?P<name>.+)"
42        m = re.search(p, self.query, re.MULTILINE)
43        return m.group("name") if m else "anonoymous"
44
45    def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str:
46        """Prepends 'EXPLAIN ...' to the query respecting the given dialect."""
47
48        if dialect == Dialect.PG:
49            if timing:
50                return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query])
51            else:
52                return "\n".join(["EXPLAIN", self.query])
53        else:
54            if timing:
55                return "\n".join(["EXPLAIN WITH(timing)", self.query])
56            else:
57                return "\n".join(["EXPLAIN", self.query])

An API for manipulating workload queries.

Query(query: str)
32    def __init__(self, query: str) -> None:
33        self.query = query
query
def name(self) -> str:
38    def name(self) -> str:
39        """Extracts and returns the name of this query from a '-- name: {name}' comment.
40        Returns 'anonymous' if the name is not set."""
41        p = r"-- name\: (?P<name>.+)"
42        m = re.search(p, self.query, re.MULTILINE)
43        return m.group("name") if m else "anonoymous"

Extracts and returns the name of this query from a '-- name: {name}' comment. Returns 'anonymous' if the name is not set.

def explain( self, timing: bool, dialect: Dialect = <Dialect.MZ: 1>) -> str:
45    def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str:
46        """Prepends 'EXPLAIN ...' to the query respecting the given dialect."""
47
48        if dialect == Dialect.PG:
49            if timing:
50                return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query])
51            else:
52                return "\n".join(["EXPLAIN", self.query])
53        else:
54            if timing:
55                return "\n".join(["EXPLAIN WITH(timing)", self.query])
56            else:
57                return "\n".join(["EXPLAIN", self.query])

Prepends 'EXPLAIN ...' to the query respecting the given dialect.

class ExplainOutput:
60class ExplainOutput:
61    """An API for manipulating 'EXPLAIN ... PLAN FOR' results."""
62
63    def __init__(self, output: str) -> None:
64        self.output = output
65
66    def __str__(self) -> str:
67        return self.output
68
69    def optimization_time(self) -> np.timedelta64 | None:
70        """Optionally, returns the optimization_time time for an 'EXPLAIN' output."""
71        p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)"
72        m = re.search(p, self.output, re.MULTILINE)
73        return util.duration_to_timedelta(m["time"]) if m else None

An API for manipulating 'EXPLAIN ... PLAN FOR' results.

ExplainOutput(output: str)
63    def __init__(self, output: str) -> None:
64        self.output = output
output
def optimization_time(self) -> numpy.timedelta64 | None:
69    def optimization_time(self) -> np.timedelta64 | None:
70        """Optionally, returns the optimization_time time for an 'EXPLAIN' output."""
71        p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)"
72        m = re.search(p, self.output, re.MULTILINE)
73        return util.duration_to_timedelta(m["time"]) if m else None

Optionally, returns the optimization_time time for an 'EXPLAIN' output.

class Database:
 76class Database:
 77    """An API to the database under test."""
 78
 79    def __init__(
 80        self,
 81        port: int,
 82        host: str,
 83        user: str,
 84        password: str | None,
 85        database: str | None,
 86        require_ssl: bool,
 87    ) -> None:
 88        logging.debug(f"Initialize Database with host={host} port={port}, user={user}")
 89
 90        self.conn = psycopg.connect(
 91            host=host,
 92            port=port,
 93            user=user,
 94            password=password,
 95            dbname=database,
 96            sslmode="require" if require_ssl else "disable",
 97        )
 98        self.conn.autocommit = True
 99        self.dialect = Dialect.MZ if "Materialize" in self.version() else Dialect.PG
100
101    def close(self) -> None:
102        self.conn.close()
103
104    def version(self) -> str:
105        result = self.query_one("SELECT version()")
106        return cast(str, result[0])
107
108    def mz_version(self) -> str | None:
109        if self.dialect == Dialect.MZ:
110            result = self.query_one("SELECT mz_version()")
111            return cast(str, result[0])
112        else:
113            return None
114
115    def drop_database(self, scenario: Scenario) -> None:
116        logging.debug(f'Drop database "{scenario}"')
117        self.execute(f"DROP DATABASE IF EXISTS {scenario}")
118
119    def create_database(self, scenario: Scenario) -> None:
120        logging.debug(f'Create database "{scenario}"')
121        self.execute(f"CREATE DATABASE {scenario}")
122
123    def explain(self, query: Query, timing: bool) -> "ExplainOutput":
124        result = self.query_all(query.explain(timing, self.dialect))
125        return ExplainOutput("\n".join([col for line in result for col in line]))
126
127    def execute(self, statement: str) -> None:
128        with self.conn.cursor() as cursor:
129            cursor.execute(statement.encode())
130
131    def execute_all(self, statements: list[str]) -> None:
132        with self.conn.cursor() as cursor:
133            for statement in statements:
134                cursor.execute(statement.encode())
135
136    def query_one(self, query: str) -> dict[Any, Any]:
137        with self.conn.cursor() as cursor:
138            cursor.execute(query.encode())
139            return cast(dict[Any, Any], cursor.fetchone())
140
141    def query_all(self, query: str) -> dict[Any, Any]:
142        with self.conn.cursor() as cursor:
143            cursor.execute(query.encode())
144            return cast(dict[Any, Any], cursor.fetchall())

An API to the database under test.

Database( port: int, host: str, user: str, password: str | None, database: str | None, require_ssl: bool)
79    def __init__(
80        self,
81        port: int,
82        host: str,
83        user: str,
84        password: str | None,
85        database: str | None,
86        require_ssl: bool,
87    ) -> None:
88        logging.debug(f"Initialize Database with host={host} port={port}, user={user}")
89
90        self.conn = psycopg.connect(
91            host=host,
92            port=port,
93            user=user,
94            password=password,
95            dbname=database,
96            sslmode="require" if require_ssl else "disable",
97        )
98        self.conn.autocommit = True
99        self.dialect = Dialect.MZ if "Materialize" in self.version() else Dialect.PG
conn
dialect
def close(self) -> None:
101    def close(self) -> None:
102        self.conn.close()
def version(self) -> str:
104    def version(self) -> str:
105        result = self.query_one("SELECT version()")
106        return cast(str, result[0])
def mz_version(self) -> str | None:
108    def mz_version(self) -> str | None:
109        if self.dialect == Dialect.MZ:
110            result = self.query_one("SELECT mz_version()")
111            return cast(str, result[0])
112        else:
113            return None
def drop_database(self, scenario: misc.python.materialize.optbench.Scenario) -> None:
115    def drop_database(self, scenario: Scenario) -> None:
116        logging.debug(f'Drop database "{scenario}"')
117        self.execute(f"DROP DATABASE IF EXISTS {scenario}")
def create_database(self, scenario: misc.python.materialize.optbench.Scenario) -> None:
119    def create_database(self, scenario: Scenario) -> None:
120        logging.debug(f'Create database "{scenario}"')
121        self.execute(f"CREATE DATABASE {scenario}")
def explain( self, query: Query, timing: bool) -> ExplainOutput:
123    def explain(self, query: Query, timing: bool) -> "ExplainOutput":
124        result = self.query_all(query.explain(timing, self.dialect))
125        return ExplainOutput("\n".join([col for line in result for col in line]))
def execute(self, statement: str) -> None:
127    def execute(self, statement: str) -> None:
128        with self.conn.cursor() as cursor:
129            cursor.execute(statement.encode())
def execute_all(self, statements: list[str]) -> None:
131    def execute_all(self, statements: list[str]) -> None:
132        with self.conn.cursor() as cursor:
133            for statement in statements:
134                cursor.execute(statement.encode())
def query_one(self, query: str) -> dict[typing.Any, typing.Any]:
136    def query_one(self, query: str) -> dict[Any, Any]:
137        with self.conn.cursor() as cursor:
138            cursor.execute(query.encode())
139            return cast(dict[Any, Any], cursor.fetchone())
def query_all(self, query: str) -> dict[typing.Any, typing.Any]:
141    def query_all(self, query: str) -> dict[Any, Any]:
142        with self.conn.cursor() as cursor:
143            cursor.execute(query.encode())
144            return cast(dict[Any, Any], cursor.fetchall())
def parse_from_file(path: pathlib.Path) -> list[str]:
151def parse_from_file(path: Path) -> list[str]:
152    """Parses a *.sql file to a list of queries."""
153    return sqlparse.split(path.read_text())

Parses a *misc.python.materialize.optbench.sql file to a list of queries.