Module materialize.cloudtest.k8s.api.k8s_service

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


import logging
from typing import Any

import pg8000
import sqlparse
from kubernetes.client import V1Service
from pg8000 import Connection, Cursor

from materialize.cloudtest.k8s.api.k8s_resource import K8sResource

LOGGER = logging.getLogger(__name__)


class K8sService(K8sResource):
    service: V1Service

    def kind(self) -> str:
        return "service"

    def create(self) -> None:
        core_v1_api = self.api()
        core_v1_api.create_namespaced_service(
            body=self.service, namespace=self.namespace()
        )

    def node_port(self, name: str | None = None) -> int:
        assert self.service and self.service.metadata and self.service.metadata.name
        service = self.api().read_namespaced_service(
            self.service.metadata.name, self.namespace()
        )
        assert service is not None

        spec = service.spec
        assert spec is not None

        ports = spec.ports
        assert ports is not None and len(ports) > 0

        port = next(p for p in ports if name is None or p.name == name)

        node_port = port.node_port
        assert node_port is not None

        return node_port

    def sql_conn(
        self,
        port: str | None = None,
        user: str = "materialize",
    ) -> Connection:
        """Get a connection to run SQL queries against the service"""
        return pg8000.connect(
            host="localhost",
            port=self.node_port(name=port),
            user=user,
        )

    def sql_cursor(
        self,
        port: str | None = None,
        user: str = "materialize",
        autocommit: bool = True,
    ) -> Cursor:
        """Get a cursor to run SQL queries against the service"""
        conn = self.sql_conn(port=port, user=user)
        conn.autocommit = autocommit
        return conn.cursor()

    def sql(
        self,
        sql: str,
        port: str | None = None,
        user: str = "materialize",
    ) -> None:
        """Run a batch of SQL statements against the service."""
        with self.sql_cursor(port=port, user=user) as cursor:
            for statement in sqlparse.split(sql):
                LOGGER.info(f"> {statement}")
                cursor.execute(statement)

    def sql_query(
        self,
        sql: str,
        port: str | None = None,
        user: str = "materialize",
    ) -> Any:
        """Execute a SQL query against the service and return results."""
        with self.sql_cursor(port=port, user=user) as cursor:
            LOGGER.info(f"> {sql}")
            cursor.execute(sql)
            return cursor.fetchall()

Classes

class K8sService (namespace: str)
Expand source code Browse git
class K8sService(K8sResource):
    service: V1Service

    def kind(self) -> str:
        return "service"

    def create(self) -> None:
        core_v1_api = self.api()
        core_v1_api.create_namespaced_service(
            body=self.service, namespace=self.namespace()
        )

    def node_port(self, name: str | None = None) -> int:
        assert self.service and self.service.metadata and self.service.metadata.name
        service = self.api().read_namespaced_service(
            self.service.metadata.name, self.namespace()
        )
        assert service is not None

        spec = service.spec
        assert spec is not None

        ports = spec.ports
        assert ports is not None and len(ports) > 0

        port = next(p for p in ports if name is None or p.name == name)

        node_port = port.node_port
        assert node_port is not None

        return node_port

    def sql_conn(
        self,
        port: str | None = None,
        user: str = "materialize",
    ) -> Connection:
        """Get a connection to run SQL queries against the service"""
        return pg8000.connect(
            host="localhost",
            port=self.node_port(name=port),
            user=user,
        )

    def sql_cursor(
        self,
        port: str | None = None,
        user: str = "materialize",
        autocommit: bool = True,
    ) -> Cursor:
        """Get a cursor to run SQL queries against the service"""
        conn = self.sql_conn(port=port, user=user)
        conn.autocommit = autocommit
        return conn.cursor()

    def sql(
        self,
        sql: str,
        port: str | None = None,
        user: str = "materialize",
    ) -> None:
        """Run a batch of SQL statements against the service."""
        with self.sql_cursor(port=port, user=user) as cursor:
            for statement in sqlparse.split(sql):
                LOGGER.info(f"> {statement}")
                cursor.execute(statement)

    def sql_query(
        self,
        sql: str,
        port: str | None = None,
        user: str = "materialize",
    ) -> Any:
        """Execute a SQL query against the service and return results."""
        with self.sql_cursor(port=port, user=user) as cursor:
            LOGGER.info(f"> {sql}")
            cursor.execute(sql)
            return cursor.fetchall()

Ancestors

Subclasses

Class variables

var service : kubernetes.client.models.v1_service.V1Service

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    core_v1_api = self.api()
    core_v1_api.create_namespaced_service(
        body=self.service, namespace=self.namespace()
    )
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "service"
def node_port(self, name: str | None = None) ‑> int
Expand source code Browse git
def node_port(self, name: str | None = None) -> int:
    assert self.service and self.service.metadata and self.service.metadata.name
    service = self.api().read_namespaced_service(
        self.service.metadata.name, self.namespace()
    )
    assert service is not None

    spec = service.spec
    assert spec is not None

    ports = spec.ports
    assert ports is not None and len(ports) > 0

    port = next(p for p in ports if name is None or p.name == name)

    node_port = port.node_port
    assert node_port is not None

    return node_port
def sql(self, sql: str, port: str | None = None, user: str = 'materialize') ‑> None

Run a batch of SQL statements against the service.

Expand source code Browse git
def sql(
    self,
    sql: str,
    port: str | None = None,
    user: str = "materialize",
) -> None:
    """Run a batch of SQL statements against the service."""
    with self.sql_cursor(port=port, user=user) as cursor:
        for statement in sqlparse.split(sql):
            LOGGER.info(f"> {statement}")
            cursor.execute(statement)
def sql_conn(self, port: str | None = None, user: str = 'materialize') ‑> pg8000.legacy.Connection

Get a connection to run SQL queries against the service

Expand source code Browse git
def sql_conn(
    self,
    port: str | None = None,
    user: str = "materialize",
) -> Connection:
    """Get a connection to run SQL queries against the service"""
    return pg8000.connect(
        host="localhost",
        port=self.node_port(name=port),
        user=user,
    )
def sql_cursor(self, port: str | None = None, user: str = 'materialize', autocommit: bool = True) ‑> pg8000.legacy.Cursor

Get a cursor to run SQL queries against the service

Expand source code Browse git
def sql_cursor(
    self,
    port: str | None = None,
    user: str = "materialize",
    autocommit: bool = True,
) -> Cursor:
    """Get a cursor to run SQL queries against the service"""
    conn = self.sql_conn(port=port, user=user)
    conn.autocommit = autocommit
    return conn.cursor()
def sql_query(self, sql: str, port: str | None = None, user: str = 'materialize') ‑> Any

Execute a SQL query against the service and return results.

Expand source code Browse git
def sql_query(
    self,
    sql: str,
    port: str | None = None,
    user: str = "materialize",
) -> Any:
    """Execute a SQL query against the service and return results."""
    with self.sql_cursor(port=port, user=user) as cursor:
        LOGGER.info(f"> {sql}")
        cursor.execute(sql)
        return cursor.fetchall()