misc.python.materialize.optbench.sql
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import logging 11import re 12from enum import Enum 13from pathlib import Path 14from typing import Any, cast 15 16import numpy as np 17import psycopg 18import sqlparse 19 20from . import Scenario, util 21 22 23class Dialect(Enum): 24 PG = 0 25 MZ = 1 26 27 28class Query: 29 """An API for manipulating workload queries.""" 30 31 def __init__(self, query: str) -> None: 32 self.query = query 33 34 def __str__(self) -> str: 35 return self.query 36 37 def name(self) -> str: 38 """Extracts and returns the name of this query from a '-- name: {name}' comment. 39 Returns 'anonymous' if the name is not set.""" 40 p = r"-- name\: (?P<name>.+)" 41 m = re.search(p, self.query, re.MULTILINE) 42 return m.group("name") if m else "anonoymous" 43 44 def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str: 45 """Prepends 'EXPLAIN ...' to the query respecting the given dialect.""" 46 47 if dialect == Dialect.PG: 48 if timing: 49 return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query]) 50 else: 51 return "\n".join(["EXPLAIN", self.query]) 52 else: 53 if timing: 54 return "\n".join(["EXPLAIN WITH(timing)", self.query]) 55 else: 56 return "\n".join(["EXPLAIN", self.query]) 57 58 59class ExplainOutput: 60 """An API for manipulating 'EXPLAIN ... PLAN FOR' results.""" 61 62 def __init__(self, output: str) -> None: 63 self.output = output 64 65 def __str__(self) -> str: 66 return self.output 67 68 def optimization_time(self) -> np.timedelta64 | None: 69 """Optionally, returns the optimization_time time for an 'EXPLAIN' output.""" 70 p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)" 71 m = re.search(p, self.output, re.MULTILINE) 72 return util.duration_to_timedelta(m["time"]) if m else None 73 74 75class Database: 76 """An API to the database under test.""" 77 78 def __init__( 79 self, 80 port: int, 81 host: str, 82 user: str, 83 password: str | None, 84 database: str | None, 85 require_ssl: bool, 86 ) -> None: 87 logging.debug(f"Initialize Database with host={host} port={port}, user={user}") 88 89 self.conn = psycopg.connect( 90 host=host, 91 port=port, 92 user=user, 93 password=password, 94 dbname=database, 95 sslmode="require" if require_ssl else "disable", 96 ) 97 self.conn.autocommit = True 98 self.dialect = Dialect.MZ if "Materialize" in self.version() else Dialect.PG 99 100 def close(self) -> None: 101 self.conn.close() 102 103 def version(self) -> str: 104 result = self.query_one("SELECT version()") 105 return cast(str, result[0]) 106 107 def mz_version(self) -> str | None: 108 if self.dialect == Dialect.MZ: 109 result = self.query_one("SELECT mz_version()") 110 return cast(str, result[0]) 111 else: 112 return None 113 114 def drop_database(self, scenario: Scenario) -> None: 115 logging.debug(f'Drop database "{scenario}"') 116 self.execute(f"DROP DATABASE IF EXISTS {scenario}") 117 118 def create_database(self, scenario: Scenario) -> None: 119 logging.debug(f'Create database "{scenario}"') 120 self.execute(f"CREATE DATABASE {scenario}") 121 122 def explain(self, query: Query, timing: bool) -> "ExplainOutput": 123 result = self.query_all(query.explain(timing, self.dialect)) 124 return ExplainOutput("\n".join([col for line in result for col in line])) 125 126 def execute(self, statement: str) -> None: 127 with self.conn.cursor() as cursor: 128 cursor.execute(statement.encode()) 129 130 def execute_all(self, statements: list[str]) -> None: 131 with self.conn.cursor() as cursor: 132 for statement in statements: 133 cursor.execute(statement.encode()) 134 135 def query_one(self, query: str) -> dict[Any, Any]: 136 with self.conn.cursor() as cursor: 137 cursor.execute(query.encode()) 138 return cast(dict[Any, Any], cursor.fetchone()) 139 140 def query_all(self, query: str) -> dict[Any, Any]: 141 with self.conn.cursor() as cursor: 142 cursor.execute(query.encode()) 143 return cast(dict[Any, Any], cursor.fetchall()) 144 145 146# Utility functions 147# ----------------- 148 149 150def parse_from_file(path: Path) -> list[str]: 151 """Parses a *.sql file to a list of queries.""" 152 return sqlparse.split(path.read_text())
class
Dialect(enum.Enum):
PG =
<Dialect.PG: 0>
MZ =
<Dialect.MZ: 1>
class
Query:
29class Query: 30 """An API for manipulating workload queries.""" 31 32 def __init__(self, query: str) -> None: 33 self.query = query 34 35 def __str__(self) -> str: 36 return self.query 37 38 def name(self) -> str: 39 """Extracts and returns the name of this query from a '-- name: {name}' comment. 40 Returns 'anonymous' if the name is not set.""" 41 p = r"-- name\: (?P<name>.+)" 42 m = re.search(p, self.query, re.MULTILINE) 43 return m.group("name") if m else "anonoymous" 44 45 def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str: 46 """Prepends 'EXPLAIN ...' to the query respecting the given dialect.""" 47 48 if dialect == Dialect.PG: 49 if timing: 50 return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query]) 51 else: 52 return "\n".join(["EXPLAIN", self.query]) 53 else: 54 if timing: 55 return "\n".join(["EXPLAIN WITH(timing)", self.query]) 56 else: 57 return "\n".join(["EXPLAIN", self.query])
An API for manipulating workload queries.
def
name(self) -> str:
38 def name(self) -> str: 39 """Extracts and returns the name of this query from a '-- name: {name}' comment. 40 Returns 'anonymous' if the name is not set.""" 41 p = r"-- name\: (?P<name>.+)" 42 m = re.search(p, self.query, re.MULTILINE) 43 return m.group("name") if m else "anonoymous"
Extracts and returns the name of this query from a '-- name: {name}' comment. Returns 'anonymous' if the name is not set.
45 def explain(self, timing: bool, dialect: Dialect = Dialect.MZ) -> str: 46 """Prepends 'EXPLAIN ...' to the query respecting the given dialect.""" 47 48 if dialect == Dialect.PG: 49 if timing: 50 return "\n".join(["EXPLAIN (ANALYZE, TIMING TRUE)", self.query]) 51 else: 52 return "\n".join(["EXPLAIN", self.query]) 53 else: 54 if timing: 55 return "\n".join(["EXPLAIN WITH(timing)", self.query]) 56 else: 57 return "\n".join(["EXPLAIN", self.query])
Prepends 'EXPLAIN ...' to the query respecting the given dialect.
class
ExplainOutput:
60class ExplainOutput: 61 """An API for manipulating 'EXPLAIN ... PLAN FOR' results.""" 62 63 def __init__(self, output: str) -> None: 64 self.output = output 65 66 def __str__(self) -> str: 67 return self.output 68 69 def optimization_time(self) -> np.timedelta64 | None: 70 """Optionally, returns the optimization_time time for an 'EXPLAIN' output.""" 71 p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)" 72 m = re.search(p, self.output, re.MULTILINE) 73 return util.duration_to_timedelta(m["time"]) if m else None
An API for manipulating 'EXPLAIN ... PLAN FOR' results.
def
optimization_time(self) -> numpy.timedelta64 | None:
69 def optimization_time(self) -> np.timedelta64 | None: 70 """Optionally, returns the optimization_time time for an 'EXPLAIN' output.""" 71 p = r"(Optimization time|Planning Time)\: (?P<time>[0-9]+(\.[0-9]+)?\s?\S+)" 72 m = re.search(p, self.output, re.MULTILINE) 73 return util.duration_to_timedelta(m["time"]) if m else None
Optionally, returns the optimization_time time for an 'EXPLAIN' output.
class
Database:
76class Database: 77 """An API to the database under test.""" 78 79 def __init__( 80 self, 81 port: int, 82 host: str, 83 user: str, 84 password: str | None, 85 database: str | None, 86 require_ssl: bool, 87 ) -> None: 88 logging.debug(f"Initialize Database with host={host} port={port}, user={user}") 89 90 self.conn = psycopg.connect( 91 host=host, 92 port=port, 93 user=user, 94 password=password, 95 dbname=database, 96 sslmode="require" if require_ssl else "disable", 97 ) 98 self.conn.autocommit = True 99 self.dialect = Dialect.MZ if "Materialize" in self.version() else Dialect.PG 100 101 def close(self) -> None: 102 self.conn.close() 103 104 def version(self) -> str: 105 result = self.query_one("SELECT version()") 106 return cast(str, result[0]) 107 108 def mz_version(self) -> str | None: 109 if self.dialect == Dialect.MZ: 110 result = self.query_one("SELECT mz_version()") 111 return cast(str, result[0]) 112 else: 113 return None 114 115 def drop_database(self, scenario: Scenario) -> None: 116 logging.debug(f'Drop database "{scenario}"') 117 self.execute(f"DROP DATABASE IF EXISTS {scenario}") 118 119 def create_database(self, scenario: Scenario) -> None: 120 logging.debug(f'Create database "{scenario}"') 121 self.execute(f"CREATE DATABASE {scenario}") 122 123 def explain(self, query: Query, timing: bool) -> "ExplainOutput": 124 result = self.query_all(query.explain(timing, self.dialect)) 125 return ExplainOutput("\n".join([col for line in result for col in line])) 126 127 def execute(self, statement: str) -> None: 128 with self.conn.cursor() as cursor: 129 cursor.execute(statement.encode()) 130 131 def execute_all(self, statements: list[str]) -> None: 132 with self.conn.cursor() as cursor: 133 for statement in statements: 134 cursor.execute(statement.encode()) 135 136 def query_one(self, query: str) -> dict[Any, Any]: 137 with self.conn.cursor() as cursor: 138 cursor.execute(query.encode()) 139 return cast(dict[Any, Any], cursor.fetchone()) 140 141 def query_all(self, query: str) -> dict[Any, Any]: 142 with self.conn.cursor() as cursor: 143 cursor.execute(query.encode()) 144 return cast(dict[Any, Any], cursor.fetchall())
An API to the database under test.
Database( port: int, host: str, user: str, password: str | None, database: str | None, require_ssl: bool)
79 def __init__( 80 self, 81 port: int, 82 host: str, 83 user: str, 84 password: str | None, 85 database: str | None, 86 require_ssl: bool, 87 ) -> None: 88 logging.debug(f"Initialize Database with host={host} port={port}, user={user}") 89 90 self.conn = psycopg.connect( 91 host=host, 92 port=port, 93 user=user, 94 password=password, 95 dbname=database, 96 sslmode="require" if require_ssl else "disable", 97 ) 98 self.conn.autocommit = True 99 self.dialect = Dialect.MZ if "Materialize" in self.version() else Dialect.PG
def
parse_from_file(path: pathlib.Path) -> list[str]:
151def parse_from_file(path: Path) -> list[str]: 152 """Parses a *.sql file to a list of queries.""" 153 return sqlparse.split(path.read_text())
Parses a *misc.python.materialize.optbench.sql file to a list of queries.