Module materialize.scalability.endpoint
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from typing import Any
import psycopg
class Endpoint:
_version: str | None = None
def __init__(self, specified_target: str):
self._specified_target = specified_target
def sql_connection(
self, quiet: bool = False
) -> psycopg.connection.Connection[tuple[Any, ...]]:
if not quiet:
print(f"Connecting to URL: {self.url()}")
conn = psycopg.connect(self.url())
conn.autocommit = True
return conn
def url(self) -> str:
return (
f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}"
)
def specified_target(self) -> str:
return self._specified_target
def resolved_target(self) -> str:
return self.specified_target()
def host(self) -> str:
raise NotImplementedError
def user(self) -> str:
raise NotImplementedError
def password(self) -> str:
raise NotImplementedError
def port(self) -> int:
raise NotImplementedError
def up(self) -> None:
raise NotImplementedError
def sql(self, sql: str) -> None:
conn = self.sql_connection()
cursor = conn.cursor()
cursor.execute(sql.encode("utf8"))
def try_load_version(self) -> str:
"""
Tries to load the version from the database or returns 'unknown' otherwise.
This first invocation requires the endpoint to be up; subsequent invocations will use the cached information.
"""
if self._version is not None:
return self._version
try:
cursor = self.sql_connection().cursor()
cursor.execute(b"SELECT mz_version()")
row = cursor.fetchone()
assert row is not None
self._version = str(row[0])
return self._version
except:
return "unknown"
Classes
class Endpoint (specified_target: str)
-
Expand source code Browse git
class Endpoint: _version: str | None = None def __init__(self, specified_target: str): self._specified_target = specified_target def sql_connection( self, quiet: bool = False ) -> psycopg.connection.Connection[tuple[Any, ...]]: if not quiet: print(f"Connecting to URL: {self.url()}") conn = psycopg.connect(self.url()) conn.autocommit = True return conn def url(self) -> str: return ( f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}" ) def specified_target(self) -> str: return self._specified_target def resolved_target(self) -> str: return self.specified_target() def host(self) -> str: raise NotImplementedError def user(self) -> str: raise NotImplementedError def password(self) -> str: raise NotImplementedError def port(self) -> int: raise NotImplementedError def up(self) -> None: raise NotImplementedError def sql(self, sql: str) -> None: conn = self.sql_connection() cursor = conn.cursor() cursor.execute(sql.encode("utf8")) def try_load_version(self) -> str: """ Tries to load the version from the database or returns 'unknown' otherwise. This first invocation requires the endpoint to be up; subsequent invocations will use the cached information. """ if self._version is not None: return self._version try: cursor = self.sql_connection().cursor() cursor.execute(b"SELECT mz_version()") row = cursor.fetchone() assert row is not None self._version = str(row[0]) return self._version except: return "unknown"
Subclasses
Methods
def host(self) ‑> str
-
Expand source code Browse git
def host(self) -> str: raise NotImplementedError
def password(self) ‑> str
-
Expand source code Browse git
def password(self) -> str: raise NotImplementedError
def port(self) ‑> int
-
Expand source code Browse git
def port(self) -> int: raise NotImplementedError
def resolved_target(self) ‑> str
-
Expand source code Browse git
def resolved_target(self) -> str: return self.specified_target()
def specified_target(self) ‑> str
-
Expand source code Browse git
def specified_target(self) -> str: return self._specified_target
def sql(self, sql: str) ‑> None
-
Expand source code Browse git
def sql(self, sql: str) -> None: conn = self.sql_connection() cursor = conn.cursor() cursor.execute(sql.encode("utf8"))
def sql_connection(self, quiet: bool = False) ‑> psycopg.Connection[tuple[typing.Any, ...]]
-
Expand source code Browse git
def sql_connection( self, quiet: bool = False ) -> psycopg.connection.Connection[tuple[Any, ...]]: if not quiet: print(f"Connecting to URL: {self.url()}") conn = psycopg.connect(self.url()) conn.autocommit = True return conn
def try_load_version(self) ‑> str
-
Tries to load the version from the database or returns 'unknown' otherwise. This first invocation requires the endpoint to be up; subsequent invocations will use the cached information.
Expand source code Browse git
def try_load_version(self) -> str: """ Tries to load the version from the database or returns 'unknown' otherwise. This first invocation requires the endpoint to be up; subsequent invocations will use the cached information. """ if self._version is not None: return self._version try: cursor = self.sql_connection().cursor() cursor.execute(b"SELECT mz_version()") row = cursor.fetchone() assert row is not None self._version = str(row[0]) return self._version except: return "unknown"
def up(self) ‑> None
-
Expand source code Browse git
def up(self) -> None: raise NotImplementedError
def url(self) ‑> str
-
Expand source code Browse git
def url(self) -> str: return ( f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}" )
def user(self) ‑> str
-
Expand source code Browse git
def user(self) -> str: raise NotImplementedError