Module materialize.cloudtest.util.sql

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import logging
from ipaddress import IPv4Address
from typing import Any

import psycopg
from psycopg.abc import Params, Query
from psycopg.connection import Connection

from materialize.cloudtest.util.authentication import AuthConfig
from materialize.cloudtest.util.environment import Environment
from materialize.cloudtest.util.web_request import WebRequests

LOGGER = logging.getLogger(__name__)


def sql_query(
    conn: Connection[Any],
    query: Query,
    vars: Params | None = None,
) -> list[list[Any]]:
    cur = conn.cursor()
    cur.execute(query, vars)
    return [list(row) for row in cur]


def sql_execute(
    conn: Connection[Any],
    query: Query,
    vars: Params | None = None,
) -> None:
    cur = conn.cursor()
    cur.execute(query, vars)


def sql_execute_ddl(
    conn: Connection[Any],
    query: Query,
    vars: Params | None = None,
) -> None:
    cur = psycopg.ClientCursor(conn)
    cur.execute(query, vars)


def pgwire_sql_conn(auth: AuthConfig, environment: Environment) -> Connection[Any]:
    environment_params = environment.wait_for_environmentd()
    pgwire_url: str = environment_params["regionInfo"]["sqlAddress"]
    (pgwire_host, pgwire_port) = pgwire_url.split(":")
    conn = psycopg.connect(
        dbname="materialize",
        user=auth.app_user,
        password=auth.app_password,
        host=pgwire_host,
        port=pgwire_port,
        sslmode=auth.pgwire_ssl_mode,
        sslrootcert=auth.tls_ca_cert_path,
    )
    conn.autocommit = True
    return conn


def sql_query_pgwire(
    auth: AuthConfig,
    environment: Environment,
    query: Query,
    vars: Params | None = None,
) -> list[list[Any]]:
    with pgwire_sql_conn(auth, environment) as conn:
        LOGGER.info(f"QUERY: {query}")
        return sql_query(conn, query, vars)


def sql_execute_pgwire(
    auth: AuthConfig,
    environment: Environment,
    query: Query,
    vars: Params | None = None,
) -> None:
    with pgwire_sql_conn(auth, environment) as conn:
        LOGGER.info(f"QUERY: {query}")
        return sql_execute(conn, query, vars)


def sql_query_http(
    auth: AuthConfig, environment: Environment, query: str
) -> list[list[Any]]:
    environment_params = environment.wait_for_environmentd()
    environmentd_url: str = environment_params["regionInfo"]["httpAddress"]
    override_ip = (
        IPv4Address("127.0.0.1")
        if environment.env_kubectl.context == "kind-mzcloud"
        else None
    )
    schema = "http" if "127.0.0.1" in environmentd_url else "https"
    verify = (
        "misc/kind/balancer/tls/ca-cert.pem"
        if schema == "https" and override_ip is not None
        else None
    )
    envd_web_requests = WebRequests(
        auth,
        f"{schema}://{environmentd_url}",
        override_ip=override_ip,
        verify=verify,
    )
    response = envd_web_requests.post(
        "/api/sql",
        {"query": query},
    )
    rows: list[list[Any]] = response.json()["results"][0]["rows"]
    return rows

Functions

def pgwire_sql_conn(auth: AuthConfig, environment: Environment) ‑> psycopg.Connection[typing.Any]
Expand source code Browse git
def pgwire_sql_conn(auth: AuthConfig, environment: Environment) -> Connection[Any]:
    environment_params = environment.wait_for_environmentd()
    pgwire_url: str = environment_params["regionInfo"]["sqlAddress"]
    (pgwire_host, pgwire_port) = pgwire_url.split(":")
    conn = psycopg.connect(
        dbname="materialize",
        user=auth.app_user,
        password=auth.app_password,
        host=pgwire_host,
        port=pgwire_port,
        sslmode=auth.pgwire_ssl_mode,
        sslrootcert=auth.tls_ca_cert_path,
    )
    conn.autocommit = True
    return conn
def sql_execute(conn: psycopg.Connection[typing.Any], query: Union[LiteralString, bytes, ForwardRef('sql.SQL'), ForwardRef('sql.Composed')], vars: Union[Sequence[Any], Mapping[str, Any], ForwardRef(None)] = None)
Expand source code Browse git
def sql_execute(
    conn: Connection[Any],
    query: Query,
    vars: Params | None = None,
) -> None:
    cur = conn.cursor()
    cur.execute(query, vars)
def sql_execute_ddl(conn: psycopg.Connection[typing.Any], query: Union[LiteralString, bytes, ForwardRef('sql.SQL'), ForwardRef('sql.Composed')], vars: Union[Sequence[Any], Mapping[str, Any], ForwardRef(None)] = None)
Expand source code Browse git
def sql_execute_ddl(
    conn: Connection[Any],
    query: Query,
    vars: Params | None = None,
) -> None:
    cur = psycopg.ClientCursor(conn)
    cur.execute(query, vars)
def sql_execute_pgwire(auth: AuthConfig, environment: Environment, query: Union[LiteralString, bytes, ForwardRef('sql.SQL'), ForwardRef('sql.Composed')], vars: Union[Sequence[Any], Mapping[str, Any], ForwardRef(None)] = None)
Expand source code Browse git
def sql_execute_pgwire(
    auth: AuthConfig,
    environment: Environment,
    query: Query,
    vars: Params | None = None,
) -> None:
    with pgwire_sql_conn(auth, environment) as conn:
        LOGGER.info(f"QUERY: {query}")
        return sql_execute(conn, query, vars)
def sql_query(conn: psycopg.Connection[typing.Any], query: Union[LiteralString, bytes, ForwardRef('sql.SQL'), ForwardRef('sql.Composed')], vars: Union[Sequence[Any], Mapping[str, Any], ForwardRef(None)] = None) ‑> list[list[typing.Any]]
Expand source code Browse git
def sql_query(
    conn: Connection[Any],
    query: Query,
    vars: Params | None = None,
) -> list[list[Any]]:
    cur = conn.cursor()
    cur.execute(query, vars)
    return [list(row) for row in cur]
def sql_query_http(auth: AuthConfig, environment: Environment, query: str) ‑> list[list[typing.Any]]
Expand source code Browse git
def sql_query_http(
    auth: AuthConfig, environment: Environment, query: str
) -> list[list[Any]]:
    environment_params = environment.wait_for_environmentd()
    environmentd_url: str = environment_params["regionInfo"]["httpAddress"]
    override_ip = (
        IPv4Address("127.0.0.1")
        if environment.env_kubectl.context == "kind-mzcloud"
        else None
    )
    schema = "http" if "127.0.0.1" in environmentd_url else "https"
    verify = (
        "misc/kind/balancer/tls/ca-cert.pem"
        if schema == "https" and override_ip is not None
        else None
    )
    envd_web_requests = WebRequests(
        auth,
        f"{schema}://{environmentd_url}",
        override_ip=override_ip,
        verify=verify,
    )
    response = envd_web_requests.post(
        "/api/sql",
        {"query": query},
    )
    rows: list[list[Any]] = response.json()["results"][0]["rows"]
    return rows
def sql_query_pgwire(auth: AuthConfig, environment: Environment, query: Union[LiteralString, bytes, ForwardRef('sql.SQL'), ForwardRef('sql.Composed')], vars: Union[Sequence[Any], Mapping[str, Any], ForwardRef(None)] = None) ‑> list[list[typing.Any]]
Expand source code Browse git
def sql_query_pgwire(
    auth: AuthConfig,
    environment: Environment,
    query: Query,
    vars: Params | None = None,
) -> list[list[Any]]:
    with pgwire_sql_conn(auth, environment) as conn:
        LOGGER.info(f"QUERY: {query}")
        return sql_query(conn, query, vars)