Module materialize.cloudtest.util.controller

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# pyright: reportMissingImports=false
import logging
import socket
import subprocess
import urllib.parse
from dataclasses import dataclass
from typing import Any

from materialize.cloudtest.util.authentication import AuthConfig
from materialize.cloudtest.util.common import log_subprocess_error, retry
from materialize.cloudtest.util.web_request import WebRequests

LOGGER = logging.getLogger(__name__)


@dataclass
class Endpoint:
    scheme: str
    host: str
    port: int

    @property
    def base_url(self) -> str:
        return f"{self.scheme}://{self.host}:{self.port}"

    @property
    def host_port(self) -> tuple[str, int]:
        return (self.host, self.port)

    @classmethod
    def parse(cls, s: str) -> "Endpoint":
        u = parse_url(s)
        assert u.hostname is not None and u.port is not None
        return cls(scheme=u.scheme or "http", host=u.hostname, port=u.port)


@dataclass
class ControllerDefinition:
    name: str
    default_port: str
    has_configurable_address: bool = True
    endpoint: Endpoint | None = None
    client_cert: tuple[str, str] | None = None

    def default_address(self) -> str:
        return f"http://127.0.0.1:{self.default_port}"

    def configured_base_url(self) -> str:
        if self.endpoint is None:
            raise RuntimeError("Endpoint not configured")

        return self.endpoint.base_url

    def requests(
        self,
        auth: AuthConfig | None,
        client_cert: tuple[str, str] | None = None,
        additional_headers: dict[str, str] | None = None,
    ) -> WebRequests:
        return WebRequests(
            auth,
            self.configured_base_url(),
            client_cert=client_cert,
            additional_headers=additional_headers,
        )


def wait_for_connectable(
    address: tuple[Any, int] | str,
    max_attempts: int = 30,
) -> None:
    def f() -> None:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect(address)

    retry(
        f,
        max_attempts=max_attempts,
        exception_types=[ConnectionRefusedError, socket.gaierror, socket.timeout],
        message=f"Error connecting to {address}. Tried {max_attempts} times.",
    )


def parse_url(s: str) -> urllib.parse.ParseResult:
    """
    >>> parse_url('127.0.0.1:8002').port
    8002
    >>> parse_url('127.0.0.1:8002').hostname
    '127.0.0.1'
    >>> parse_url('the men who stare at goats')
    Traceback (most recent call last):
      File "/nix/store/7awm88zrzq5c0qks8ypf8s8jblm4r3i2-python3-3.9.16/lib/python3.9/doctest.py", line 1334, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest __main__.parse_url[2]>", line 1, in <module>
        parse_url('the men who stare at goats')
      File "/Users/rami/Code/cloud/k8s_tests/util.py", line 343, in parse_url
        raise ValueError(s)
    ValueError: //the men who stare at goats
    """
    try:
        parsed = urllib.parse.urlparse(s)
        assert parsed.netloc is not None and parsed.port is not None
    except AssertionError:
        try:
            s = "//" + s
            parsed = urllib.parse.urlparse(s)
            assert parsed.netloc is not None and parsed.port is not None
        except AssertionError as e:
            raise ValueError(s) from e
    return parsed


def launch_controllers(controller_names: list[str], docker_env: dict[str, str]) -> None:
    try:
        subprocess.run(
            [
                "bin/compose",
                "up",
                "--wait",
                *controller_names,
            ],
            capture_output=True,
            check=True,
            env=docker_env,
        )
    except subprocess.CalledProcessError as e:
        log_subprocess_error(e)
        raise


def wait_for_controllers(*endpoints: Endpoint) -> None:
    for endpoint in endpoints:
        LOGGER.info(f"Waiting for {endpoint.host_port} to be connectable...")
        wait_for_connectable(endpoint.host_port)


def cleanup_controllers(docker_env: dict[str, str]) -> None:
    try:
        subprocess.run(
            ["bin/compose", "down", "-v"],
            capture_output=True,
            check=True,
            env=docker_env,
        )
    except subprocess.CalledProcessError as e:
        log_subprocess_error(e)
        raise

Functions

def cleanup_controllers(docker_env: dict[str, str]) ‑> None
Expand source code Browse git
def cleanup_controllers(docker_env: dict[str, str]) -> None:
    try:
        subprocess.run(
            ["bin/compose", "down", "-v"],
            capture_output=True,
            check=True,
            env=docker_env,
        )
    except subprocess.CalledProcessError as e:
        log_subprocess_error(e)
        raise
def launch_controllers(controller_names: list[str], docker_env: dict[str, str]) ‑> None
Expand source code Browse git
def launch_controllers(controller_names: list[str], docker_env: dict[str, str]) -> None:
    try:
        subprocess.run(
            [
                "bin/compose",
                "up",
                "--wait",
                *controller_names,
            ],
            capture_output=True,
            check=True,
            env=docker_env,
        )
    except subprocess.CalledProcessError as e:
        log_subprocess_error(e)
        raise
def parse_url(s: str) ‑> urllib.parse.ParseResult
>>> parse_url('127.0.0.1:8002').port
8002
>>> parse_url('127.0.0.1:8002').hostname
'127.0.0.1'
>>> parse_url('the men who stare at goats')
Traceback (most recent call last):
  File "/nix/store/7awm88zrzq5c0qks8ypf8s8jblm4r3i2-python3-3.9.16/lib/python3.9/doctest.py", line 1334, in __run
    exec(compile(example.source, filename, "single",
  File "<doctest __main__.parse_url[2]>", line 1, in <module>
    parse_url('the men who stare at goats')
  File "/Users/rami/Code/cloud/k8s_tests/util.py", line 343, in parse_url
    raise ValueError(s)
ValueError: //the men who stare at goats
Expand source code Browse git
def parse_url(s: str) -> urllib.parse.ParseResult:
    """
    >>> parse_url('127.0.0.1:8002').port
    8002
    >>> parse_url('127.0.0.1:8002').hostname
    '127.0.0.1'
    >>> parse_url('the men who stare at goats')
    Traceback (most recent call last):
      File "/nix/store/7awm88zrzq5c0qks8ypf8s8jblm4r3i2-python3-3.9.16/lib/python3.9/doctest.py", line 1334, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest __main__.parse_url[2]>", line 1, in <module>
        parse_url('the men who stare at goats')
      File "/Users/rami/Code/cloud/k8s_tests/util.py", line 343, in parse_url
        raise ValueError(s)
    ValueError: //the men who stare at goats
    """
    try:
        parsed = urllib.parse.urlparse(s)
        assert parsed.netloc is not None and parsed.port is not None
    except AssertionError:
        try:
            s = "//" + s
            parsed = urllib.parse.urlparse(s)
            assert parsed.netloc is not None and parsed.port is not None
        except AssertionError as e:
            raise ValueError(s) from e
    return parsed
def wait_for_connectable(address: tuple[typing.Any, int] | str, max_attempts: int = 30) ‑> None
Expand source code Browse git
def wait_for_connectable(
    address: tuple[Any, int] | str,
    max_attempts: int = 30,
) -> None:
    def f() -> None:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect(address)

    retry(
        f,
        max_attempts=max_attempts,
        exception_types=[ConnectionRefusedError, socket.gaierror, socket.timeout],
        message=f"Error connecting to {address}. Tried {max_attempts} times.",
    )
def wait_for_controllers(*endpoints: Endpoint) ‑> None
Expand source code Browse git
def wait_for_controllers(*endpoints: Endpoint) -> None:
    for endpoint in endpoints:
        LOGGER.info(f"Waiting for {endpoint.host_port} to be connectable...")
        wait_for_connectable(endpoint.host_port)

Classes

class ControllerDefinition (name: str, default_port: str, has_configurable_address: bool = True, endpoint: Endpoint | None = None, client_cert: tuple[str, str] | None = None)

ControllerDefinition(name: str, default_port: str, has_configurable_address: bool = True, endpoint: materialize.cloudtest.util.controller.Endpoint | None = None, client_cert: tuple[str, str] | None = None)

Expand source code Browse git
@dataclass
class ControllerDefinition:
    name: str
    default_port: str
    has_configurable_address: bool = True
    endpoint: Endpoint | None = None
    client_cert: tuple[str, str] | None = None

    def default_address(self) -> str:
        return f"http://127.0.0.1:{self.default_port}"

    def configured_base_url(self) -> str:
        if self.endpoint is None:
            raise RuntimeError("Endpoint not configured")

        return self.endpoint.base_url

    def requests(
        self,
        auth: AuthConfig | None,
        client_cert: tuple[str, str] | None = None,
        additional_headers: dict[str, str] | None = None,
    ) -> WebRequests:
        return WebRequests(
            auth,
            self.configured_base_url(),
            client_cert=client_cert,
            additional_headers=additional_headers,
        )

Class variables

var client_cert : tuple[str, str] | None
var default_port : str
var endpointEndpoint | None
var has_configurable_address : bool
var name : str

Methods

def configured_base_url(self) ‑> str
Expand source code Browse git
def configured_base_url(self) -> str:
    if self.endpoint is None:
        raise RuntimeError("Endpoint not configured")

    return self.endpoint.base_url
def default_address(self) ‑> str
Expand source code Browse git
def default_address(self) -> str:
    return f"http://127.0.0.1:{self.default_port}"
def requests(self, auth: AuthConfig | None, client_cert: tuple[str, str] | None = None, additional_headers: dict[str, str] | None = None) ‑> WebRequests
Expand source code Browse git
def requests(
    self,
    auth: AuthConfig | None,
    client_cert: tuple[str, str] | None = None,
    additional_headers: dict[str, str] | None = None,
) -> WebRequests:
    return WebRequests(
        auth,
        self.configured_base_url(),
        client_cert=client_cert,
        additional_headers=additional_headers,
    )
class Endpoint (scheme: str, host: str, port: int)

Endpoint(scheme: str, host: str, port: int)

Expand source code Browse git
@dataclass
class Endpoint:
    scheme: str
    host: str
    port: int

    @property
    def base_url(self) -> str:
        return f"{self.scheme}://{self.host}:{self.port}"

    @property
    def host_port(self) -> tuple[str, int]:
        return (self.host, self.port)

    @classmethod
    def parse(cls, s: str) -> "Endpoint":
        u = parse_url(s)
        assert u.hostname is not None and u.port is not None
        return cls(scheme=u.scheme or "http", host=u.hostname, port=u.port)

Class variables

var host : str
var port : int
var scheme : str

Static methods

def parse(s: str) ‑> Endpoint
Expand source code Browse git
@classmethod
def parse(cls, s: str) -> "Endpoint":
    u = parse_url(s)
    assert u.hostname is not None and u.port is not None
    return cls(scheme=u.scheme or "http", host=u.hostname, port=u.port)

Instance variables

var base_url : str
Expand source code Browse git
@property
def base_url(self) -> str:
    return f"{self.scheme}://{self.host}:{self.port}"
var host_port : tuple[str, int]
Expand source code Browse git
@property
def host_port(self) -> tuple[str, int]:
    return (self.host, self.port)