Module materialize.scalability.endpoint

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from typing import Any

import psycopg


class Endpoint:

    _version: str | None = None

    def __init__(self, specified_target: str):
        self._specified_target = specified_target

    def sql_connection(
        self, quiet: bool = False
    ) -> psycopg.connection.Connection[tuple[Any, ...]]:
        if not quiet:
            print(f"Connecting to URL: {self.url()}")

        conn = psycopg.connect(self.url())
        conn.autocommit = True
        return conn

    def url(self) -> str:
        return (
            f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}"
        )

    def specified_target(self) -> str:
        return self._specified_target

    def resolved_target(self) -> str:
        return self.specified_target()

    def host(self) -> str:
        raise NotImplementedError

    def user(self) -> str:
        raise NotImplementedError

    def password(self) -> str:
        raise NotImplementedError

    def port(self) -> int:
        raise NotImplementedError

    def up(self) -> None:
        raise NotImplementedError

    def sql(self, sql: str) -> None:
        conn = self.sql_connection()
        cursor = conn.cursor()
        cursor.execute(sql.encode("utf8"))

    def try_load_version(self) -> str:
        """
        Tries to load the version from the database or returns 'unknown' otherwise.
        This first invocation requires the endpoint to be up; subsequent invocations will use the cached information.
        """

        if self._version is not None:
            return self._version

        try:
            cursor = self.sql_connection().cursor()
            cursor.execute(b"SELECT mz_version()")
            row = cursor.fetchone()
            assert row is not None
            self._version = str(row[0])
            return self._version
        except:
            return "unknown"

Classes

class Endpoint (specified_target: str)
Expand source code Browse git
class Endpoint:

    _version: str | None = None

    def __init__(self, specified_target: str):
        self._specified_target = specified_target

    def sql_connection(
        self, quiet: bool = False
    ) -> psycopg.connection.Connection[tuple[Any, ...]]:
        if not quiet:
            print(f"Connecting to URL: {self.url()}")

        conn = psycopg.connect(self.url())
        conn.autocommit = True
        return conn

    def url(self) -> str:
        return (
            f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}"
        )

    def specified_target(self) -> str:
        return self._specified_target

    def resolved_target(self) -> str:
        return self.specified_target()

    def host(self) -> str:
        raise NotImplementedError

    def user(self) -> str:
        raise NotImplementedError

    def password(self) -> str:
        raise NotImplementedError

    def port(self) -> int:
        raise NotImplementedError

    def up(self) -> None:
        raise NotImplementedError

    def sql(self, sql: str) -> None:
        conn = self.sql_connection()
        cursor = conn.cursor()
        cursor.execute(sql.encode("utf8"))

    def try_load_version(self) -> str:
        """
        Tries to load the version from the database or returns 'unknown' otherwise.
        This first invocation requires the endpoint to be up; subsequent invocations will use the cached information.
        """

        if self._version is not None:
            return self._version

        try:
            cursor = self.sql_connection().cursor()
            cursor.execute(b"SELECT mz_version()")
            row = cursor.fetchone()
            assert row is not None
            self._version = str(row[0])
            return self._version
        except:
            return "unknown"

Subclasses

Methods

def host(self) ‑> str
Expand source code Browse git
def host(self) -> str:
    raise NotImplementedError
def password(self) ‑> str
Expand source code Browse git
def password(self) -> str:
    raise NotImplementedError
def port(self) ‑> int
Expand source code Browse git
def port(self) -> int:
    raise NotImplementedError
def resolved_target(self) ‑> str
Expand source code Browse git
def resolved_target(self) -> str:
    return self.specified_target()
def specified_target(self) ‑> str
Expand source code Browse git
def specified_target(self) -> str:
    return self._specified_target
def sql(self, sql: str) ‑> None
Expand source code Browse git
def sql(self, sql: str) -> None:
    conn = self.sql_connection()
    cursor = conn.cursor()
    cursor.execute(sql.encode("utf8"))
def sql_connection(self, quiet: bool = False) ‑> psycopg.Connection[tuple[typing.Any, ...]]
Expand source code Browse git
def sql_connection(
    self, quiet: bool = False
) -> psycopg.connection.Connection[tuple[Any, ...]]:
    if not quiet:
        print(f"Connecting to URL: {self.url()}")

    conn = psycopg.connect(self.url())
    conn.autocommit = True
    return conn
def try_load_version(self) ‑> str

Tries to load the version from the database or returns 'unknown' otherwise. This first invocation requires the endpoint to be up; subsequent invocations will use the cached information.

Expand source code Browse git
def try_load_version(self) -> str:
    """
    Tries to load the version from the database or returns 'unknown' otherwise.
    This first invocation requires the endpoint to be up; subsequent invocations will use the cached information.
    """

    if self._version is not None:
        return self._version

    try:
        cursor = self.sql_connection().cursor()
        cursor.execute(b"SELECT mz_version()")
        row = cursor.fetchone()
        assert row is not None
        self._version = str(row[0])
        return self._version
    except:
        return "unknown"
def up(self) ‑> None
Expand source code Browse git
def up(self) -> None:
    raise NotImplementedError
def url(self) ‑> str
Expand source code Browse git
def url(self) -> str:
    return (
        f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}"
    )
def user(self) ‑> str
Expand source code Browse git
def user(self) -> str:
    raise NotImplementedError