Module materialize.cloudtest.util.environment

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from typing import Any

import requests
from requests import Response

from materialize.cloudtest.util.authentication import AuthConfig
from materialize.cloudtest.util.common import retry
from materialize.cloudtest.util.controller import wait_for_connectable
from materialize.cloudtest.util.kubectl import Kubectl
from materialize.cloudtest.util.web_request import WebRequests


class Environment:
    def __init__(
        self,
        auth: AuthConfig,
        region_api_server_base_url: str,
        env_kubectl: Kubectl,
        sys_kubectl: Kubectl,
    ):
        self.auth = auth
        self.env_kubectl = env_kubectl
        self.sys_kubectl = sys_kubectl
        self.region_api_requests = WebRequests(
            self.auth, region_api_server_base_url, default_timeout_in_sec=45
        )
        self.create_env_assignment_get_retries = 120
        self.envd_waiting_get_env_retries = 900

    def create_environment_assignment(
        self,
        image: str | None = None,
    ) -> dict[str, Any]:
        environment_assignment = f"{self.auth.organization_id}-0"
        environment = f"environment-{environment_assignment}"

        json: dict[str, Any] = {}
        if image is not None:
            json["environmentdImageRef"] = image
        self.region_api_requests.patch(
            "/api/region",
            json,
        )

        self.env_kubectl.get_retry(
            None,
            "environment",
            environment,
            self.create_env_assignment_get_retries,
        )
        return self.sys_kubectl.get(
            None,
            "environmentassignment",
            environment_assignment,
        )

    def wait_for_environmentd(self, max_attempts: int = 300) -> dict[str, Any]:
        def get_environment() -> Response:
            response = self.region_api_requests.get(
                "/api/region",
            )
            region_info = response.json().get("regionInfo")
            assert region_info
            assert region_info.get("resolvable")
            assert region_info.get("sqlAddress")
            return response

        environment_json: dict[str, Any] = retry(
            get_environment, self.envd_waiting_get_env_retries, [AssertionError]
        ).json()
        pgwire_url = environment_json["regionInfo"]["sqlAddress"]
        (pgwire_host, pgwire_port) = pgwire_url.split(":")
        wait_for_connectable((pgwire_host, int(pgwire_port)), max_attempts)
        return environment_json

    def delete_environment_assignment(self) -> None:
        environment_assignment = f"{self.auth.organization_id}-0"
        environment = f"environment-{environment_assignment}"

        def delete_environment() -> None:
            self.region_api_requests.delete(
                "/api/region",
                # we have a 60 second timeout in the region api's load balancer
                # for this call and a 5 minute timeout in the region api (which
                # is relevant when running in kind)
                timeout_in_sec=305,
            )

        retry(delete_environment, 20, [requests.exceptions.HTTPError])

        assert (
            self.env_kubectl.get_or_none(
                namespace=None,
                resource_type="namespace",
                resource_name=environment,
            )
            is None
        )
        assert (
            self.env_kubectl.get_or_none(
                namespace=None,
                resource_type="environment",
                resource_name=environment,
            )
            is None
        )
        assert (
            self.sys_kubectl.get_or_none(
                namespace=None,
                resource_type="environmentassignment",
                resource_name=environment_assignment,
            )
            is None
        )

    def wait_for_no_environmentd(self) -> None:
        # Confirm the Region API is not returning the environment
        def get_environment() -> None:
            res = self.region_api_requests.get(
                "/api/region",
            )
            # a 204 indicates no region is found
            if res.status_code != 204:
                raise AssertionError()

        retry(get_environment, 600, [AssertionError])

        # Confirm the environment resource is gone
        environment_assignment = f"{self.auth.organization_id}-0"
        environment = f"environment-{environment_assignment}"

        def get_k8s_environment() -> None:
            assert (
                self.env_kubectl.get_or_none(
                    namespace=None,
                    resource_type="environment",
                    resource_name=environment,
                )
                is None
            )

        retry(get_k8s_environment, 600, [AssertionError])

    def await_environment_pod(
        self, kubectl: Kubectl, namespace: str, pod_name: str
    ) -> None:
        # we can't just wait, since it errors if it doesn't exist yet
        kubectl.get_retry(
            namespace=namespace,
            resource_type="pod",
            resource_name=pod_name,
            # Especially on dev stacks, this can take a little while
            max_attempts=360,
        )
        kubectl.wait(
            namespace=namespace,
            resource_type="pod",
            resource_name=pod_name,
            wait_for="condition=Ready",
            # If we're unlucky with certificates, we can take up to 10 minutes :-(
            # -- pad a bit just in case
            timeout_secs=630,
        )

    def cleanup_crds(
        self,
    ) -> None:
        environment_context = self.env_kubectl.context
        system_context = self.sys_kubectl.context

        if system_context != "kind-mzcloud":
            return

        assert "production" not in system_context
        assert "production" not in environment_context
        assert "staging" not in system_context
        assert "staging" not in environment_context

        self.sys_kubectl.delete(
            namespace=None,
            resource_type="crd",
            resource_name="environmentassignments.materialize.cloud",
        )

        self.env_kubectl.delete(
            namespace=None,
            resource_type="crd",
            resource_name="environments.materialize.cloud",
        )

        self.env_kubectl.delete(
            namespace=None,
            resource_type="crd",
            resource_name="vpcendpoints.materialize.cloud",
        )

Classes

class Environment (auth: AuthConfig, region_api_server_base_url: str, env_kubectl: Kubectl, sys_kubectl: Kubectl)
Expand source code Browse git
class Environment:
    def __init__(
        self,
        auth: AuthConfig,
        region_api_server_base_url: str,
        env_kubectl: Kubectl,
        sys_kubectl: Kubectl,
    ):
        self.auth = auth
        self.env_kubectl = env_kubectl
        self.sys_kubectl = sys_kubectl
        self.region_api_requests = WebRequests(
            self.auth, region_api_server_base_url, default_timeout_in_sec=45
        )
        self.create_env_assignment_get_retries = 120
        self.envd_waiting_get_env_retries = 900

    def create_environment_assignment(
        self,
        image: str | None = None,
    ) -> dict[str, Any]:
        environment_assignment = f"{self.auth.organization_id}-0"
        environment = f"environment-{environment_assignment}"

        json: dict[str, Any] = {}
        if image is not None:
            json["environmentdImageRef"] = image
        self.region_api_requests.patch(
            "/api/region",
            json,
        )

        self.env_kubectl.get_retry(
            None,
            "environment",
            environment,
            self.create_env_assignment_get_retries,
        )
        return self.sys_kubectl.get(
            None,
            "environmentassignment",
            environment_assignment,
        )

    def wait_for_environmentd(self, max_attempts: int = 300) -> dict[str, Any]:
        def get_environment() -> Response:
            response = self.region_api_requests.get(
                "/api/region",
            )
            region_info = response.json().get("regionInfo")
            assert region_info
            assert region_info.get("resolvable")
            assert region_info.get("sqlAddress")
            return response

        environment_json: dict[str, Any] = retry(
            get_environment, self.envd_waiting_get_env_retries, [AssertionError]
        ).json()
        pgwire_url = environment_json["regionInfo"]["sqlAddress"]
        (pgwire_host, pgwire_port) = pgwire_url.split(":")
        wait_for_connectable((pgwire_host, int(pgwire_port)), max_attempts)
        return environment_json

    def delete_environment_assignment(self) -> None:
        environment_assignment = f"{self.auth.organization_id}-0"
        environment = f"environment-{environment_assignment}"

        def delete_environment() -> None:
            self.region_api_requests.delete(
                "/api/region",
                # we have a 60 second timeout in the region api's load balancer
                # for this call and a 5 minute timeout in the region api (which
                # is relevant when running in kind)
                timeout_in_sec=305,
            )

        retry(delete_environment, 20, [requests.exceptions.HTTPError])

        assert (
            self.env_kubectl.get_or_none(
                namespace=None,
                resource_type="namespace",
                resource_name=environment,
            )
            is None
        )
        assert (
            self.env_kubectl.get_or_none(
                namespace=None,
                resource_type="environment",
                resource_name=environment,
            )
            is None
        )
        assert (
            self.sys_kubectl.get_or_none(
                namespace=None,
                resource_type="environmentassignment",
                resource_name=environment_assignment,
            )
            is None
        )

    def wait_for_no_environmentd(self) -> None:
        # Confirm the Region API is not returning the environment
        def get_environment() -> None:
            res = self.region_api_requests.get(
                "/api/region",
            )
            # a 204 indicates no region is found
            if res.status_code != 204:
                raise AssertionError()

        retry(get_environment, 600, [AssertionError])

        # Confirm the environment resource is gone
        environment_assignment = f"{self.auth.organization_id}-0"
        environment = f"environment-{environment_assignment}"

        def get_k8s_environment() -> None:
            assert (
                self.env_kubectl.get_or_none(
                    namespace=None,
                    resource_type="environment",
                    resource_name=environment,
                )
                is None
            )

        retry(get_k8s_environment, 600, [AssertionError])

    def await_environment_pod(
        self, kubectl: Kubectl, namespace: str, pod_name: str
    ) -> None:
        # we can't just wait, since it errors if it doesn't exist yet
        kubectl.get_retry(
            namespace=namespace,
            resource_type="pod",
            resource_name=pod_name,
            # Especially on dev stacks, this can take a little while
            max_attempts=360,
        )
        kubectl.wait(
            namespace=namespace,
            resource_type="pod",
            resource_name=pod_name,
            wait_for="condition=Ready",
            # If we're unlucky with certificates, we can take up to 10 minutes :-(
            # -- pad a bit just in case
            timeout_secs=630,
        )

    def cleanup_crds(
        self,
    ) -> None:
        environment_context = self.env_kubectl.context
        system_context = self.sys_kubectl.context

        if system_context != "kind-mzcloud":
            return

        assert "production" not in system_context
        assert "production" not in environment_context
        assert "staging" not in system_context
        assert "staging" not in environment_context

        self.sys_kubectl.delete(
            namespace=None,
            resource_type="crd",
            resource_name="environmentassignments.materialize.cloud",
        )

        self.env_kubectl.delete(
            namespace=None,
            resource_type="crd",
            resource_name="environments.materialize.cloud",
        )

        self.env_kubectl.delete(
            namespace=None,
            resource_type="crd",
            resource_name="vpcendpoints.materialize.cloud",
        )

Methods

def await_environment_pod(self, kubectl: Kubectl, namespace: str, pod_name: str) ‑> None
Expand source code Browse git
def await_environment_pod(
    self, kubectl: Kubectl, namespace: str, pod_name: str
) -> None:
    # we can't just wait, since it errors if it doesn't exist yet
    kubectl.get_retry(
        namespace=namespace,
        resource_type="pod",
        resource_name=pod_name,
        # Especially on dev stacks, this can take a little while
        max_attempts=360,
    )
    kubectl.wait(
        namespace=namespace,
        resource_type="pod",
        resource_name=pod_name,
        wait_for="condition=Ready",
        # If we're unlucky with certificates, we can take up to 10 minutes :-(
        # -- pad a bit just in case
        timeout_secs=630,
    )
def cleanup_crds(self) ‑> None
Expand source code Browse git
def cleanup_crds(
    self,
) -> None:
    environment_context = self.env_kubectl.context
    system_context = self.sys_kubectl.context

    if system_context != "kind-mzcloud":
        return

    assert "production" not in system_context
    assert "production" not in environment_context
    assert "staging" not in system_context
    assert "staging" not in environment_context

    self.sys_kubectl.delete(
        namespace=None,
        resource_type="crd",
        resource_name="environmentassignments.materialize.cloud",
    )

    self.env_kubectl.delete(
        namespace=None,
        resource_type="crd",
        resource_name="environments.materialize.cloud",
    )

    self.env_kubectl.delete(
        namespace=None,
        resource_type="crd",
        resource_name="vpcendpoints.materialize.cloud",
    )
def create_environment_assignment(self, image: str | None = None) ‑> dict[str, typing.Any]
Expand source code Browse git
def create_environment_assignment(
    self,
    image: str | None = None,
) -> dict[str, Any]:
    environment_assignment = f"{self.auth.organization_id}-0"
    environment = f"environment-{environment_assignment}"

    json: dict[str, Any] = {}
    if image is not None:
        json["environmentdImageRef"] = image
    self.region_api_requests.patch(
        "/api/region",
        json,
    )

    self.env_kubectl.get_retry(
        None,
        "environment",
        environment,
        self.create_env_assignment_get_retries,
    )
    return self.sys_kubectl.get(
        None,
        "environmentassignment",
        environment_assignment,
    )
def delete_environment_assignment(self) ‑> None
Expand source code Browse git
def delete_environment_assignment(self) -> None:
    environment_assignment = f"{self.auth.organization_id}-0"
    environment = f"environment-{environment_assignment}"

    def delete_environment() -> None:
        self.region_api_requests.delete(
            "/api/region",
            # we have a 60 second timeout in the region api's load balancer
            # for this call and a 5 minute timeout in the region api (which
            # is relevant when running in kind)
            timeout_in_sec=305,
        )

    retry(delete_environment, 20, [requests.exceptions.HTTPError])

    assert (
        self.env_kubectl.get_or_none(
            namespace=None,
            resource_type="namespace",
            resource_name=environment,
        )
        is None
    )
    assert (
        self.env_kubectl.get_or_none(
            namespace=None,
            resource_type="environment",
            resource_name=environment,
        )
        is None
    )
    assert (
        self.sys_kubectl.get_or_none(
            namespace=None,
            resource_type="environmentassignment",
            resource_name=environment_assignment,
        )
        is None
    )
def wait_for_environmentd(self, max_attempts: int = 300) ‑> dict[str, typing.Any]
Expand source code Browse git
def wait_for_environmentd(self, max_attempts: int = 300) -> dict[str, Any]:
    def get_environment() -> Response:
        response = self.region_api_requests.get(
            "/api/region",
        )
        region_info = response.json().get("regionInfo")
        assert region_info
        assert region_info.get("resolvable")
        assert region_info.get("sqlAddress")
        return response

    environment_json: dict[str, Any] = retry(
        get_environment, self.envd_waiting_get_env_retries, [AssertionError]
    ).json()
    pgwire_url = environment_json["regionInfo"]["sqlAddress"]
    (pgwire_host, pgwire_port) = pgwire_url.split(":")
    wait_for_connectable((pgwire_host, int(pgwire_port)), max_attempts)
    return environment_json
def wait_for_no_environmentd(self) ‑> None
Expand source code Browse git
def wait_for_no_environmentd(self) -> None:
    # Confirm the Region API is not returning the environment
    def get_environment() -> None:
        res = self.region_api_requests.get(
            "/api/region",
        )
        # a 204 indicates no region is found
        if res.status_code != 204:
            raise AssertionError()

    retry(get_environment, 600, [AssertionError])

    # Confirm the environment resource is gone
    environment_assignment = f"{self.auth.organization_id}-0"
    environment = f"environment-{environment_assignment}"

    def get_k8s_environment() -> None:
        assert (
            self.env_kubectl.get_or_none(
                namespace=None,
                resource_type="environment",
                resource_name=environment,
            )
            is None
        )

    retry(get_k8s_environment, 600, [AssertionError])