Module materialize.cloudtest.util.web_request

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import logging
from collections.abc import Generator
from contextlib import contextmanager
from ipaddress import IPv4Address, IPv6Address
from textwrap import dedent
from typing import Any
from urllib.parse import urlparse

import requests
from requests.adapters import DEFAULT_POOLBLOCK, HTTPAdapter, Retry

from materialize.cloudtest.util.authentication import AuthConfig

LOGGER = logging.getLogger(__name__)


@contextmanager
def verbose_http_errors() -> Generator[None, None, None]:
    try:
        yield
    except requests.HTTPError as e:
        LOGGER.error(
            dedent(
                f"""
                response status: {e.response.status_code}
                response reason: {e.response.reason}
                response content: {e.response.content}
                """
            )
        )
        raise


class DNSResolverHTTPSAdapter(HTTPAdapter):
    def __init__(self, common_name, ip, **kwargs):
        self.__common_name = common_name
        self.__ip = str(ip)
        super().__init__(**kwargs)

    def get_connection(self, url, proxies=None):
        redirected_url = url.replace(self.__common_name.lower(), self.__ip)
        LOGGER.info(f"original url: {url}")
        LOGGER.info(f"redirected url: {redirected_url}")
        return super().get_connection(
            redirected_url,
            proxies=proxies,
        )

    def init_poolmanager(
        self,
        connections,
        maxsize,
        block=DEFAULT_POOLBLOCK,
        **pool_kwargs,
    ):
        pool_kwargs["assert_hostname"] = self.__common_name
        pool_kwargs["server_hostname"] = self.__common_name
        super().init_poolmanager(
            connections,
            maxsize,
            block,
            **pool_kwargs,
        )


class WebRequests:
    def __init__(
        self,
        auth: AuthConfig | None,
        base_url: str,
        client_cert: tuple[str, str] | None = None,
        additional_headers: dict[str, str] | None = None,
        default_timeout_in_sec: int = 15,
        override_ip: IPv4Address | IPv6Address | None = None,
        verify: str | None = None,
    ):
        self.auth = auth
        self.base_url = base_url
        self.client_cert = client_cert
        self.additional_headers = additional_headers
        self.default_timeout_in_sec = default_timeout_in_sec
        self.override_ip = override_ip
        self.verify = verify

    def session(self) -> requests.Session:
        session = requests.Session()
        if self.override_ip is not None:
            parsed_url = urlparse(self.base_url)
            session.mount(
                self.base_url.lower(),
                DNSResolverHTTPSAdapter(
                    parsed_url.netloc.split(":", 1)[0],
                    self.override_ip,
                ),
            )
        return session

    def get(
        self,
        path: str,
        timeout_in_sec: int | None = None,
    ) -> requests.Response:
        LOGGER.info(f"GET {self.base_url}{path}")

        def try_get() -> requests.Response:
            with verbose_http_errors():
                headers = self._create_headers(self.auth)
                s = self.session()
                s.mount(self.base_url, HTTPAdapter(max_retries=Retry(3)))
                response = s.get(
                    f"{self.base_url}{path}",
                    headers=headers,
                    timeout=self._timeout_or_default(timeout_in_sec),
                    cert=self.client_cert,
                    verify=self.verify,
                )
                response.raise_for_status()
                return response

        try:
            response = try_get()
        except requests.exceptions.HTTPError as e:
            if self.auth and e.response.status_code == 401:
                self.auth.refresh()
                response = try_get()
            else:
                raise

        return response

    def post(
        self,
        path: str,
        json: Any,
        timeout_in_sec: int | None = None,
    ) -> requests.Response:
        LOGGER.info(f"POST {self.base_url}{path}")

        def try_post() -> requests.Response:
            with verbose_http_errors():
                headers = self._create_headers(self.auth)
                response = self.session().post(
                    f"{self.base_url}{path}",
                    headers=headers,
                    json=json,
                    timeout=self._timeout_or_default(timeout_in_sec),
                    cert=self.client_cert,
                    verify=self.verify,
                )
                response.raise_for_status()
                return response

        try:
            response = try_post()
        except requests.exceptions.HTTPError as e:
            if self.auth and e.response.status_code == 401:
                self.auth.refresh()
                response = try_post()
            else:
                raise

        return response

    def patch(
        self,
        path: str,
        json: Any,
        timeout_in_sec: int | None = None,
    ) -> requests.Response:
        LOGGER.info(f"PATCH {self.base_url}{path}")

        def try_patch() -> requests.Response:
            with verbose_http_errors():
                headers = self._create_headers(self.auth)
                response = self.session().patch(
                    f"{self.base_url}{path}",
                    headers=headers,
                    json=json,
                    timeout=self._timeout_or_default(timeout_in_sec),
                    cert=self.client_cert,
                    verify=self.verify,
                )
                response.raise_for_status()
                return response

        try:
            response = try_patch()
        except requests.exceptions.HTTPError as e:
            if self.auth and e.response.status_code == 401:
                self.auth.refresh()
                response = try_patch()
            else:
                raise

        return response

    def delete(
        self,
        path: str,
        params: Any = None,
        timeout_in_sec: int | None = None,
    ) -> requests.Response:
        LOGGER.info(f"DELETE {self.base_url}{path}")

        def try_delete() -> requests.Response:
            with verbose_http_errors():
                headers = self._create_headers(self.auth)
                response = self.session().delete(
                    f"{self.base_url}{path}",
                    headers=headers,
                    timeout=self._timeout_or_default(timeout_in_sec),
                    cert=self.client_cert,
                    verify=self.verify,
                    **(
                        {
                            "params": params,
                        }
                        if params is not None
                        else {}
                    ),
                )
                response.raise_for_status()
                return response

        try:
            response = try_delete()
        except requests.exceptions.HTTPError as e:
            if self.auth and e.response.status_code == 401:
                self.auth.refresh()
                response = try_delete()
            else:
                raise

        return response

    def _create_headers(self, auth: AuthConfig | None) -> dict[str, Any]:
        headers = self.additional_headers.copy() if self.additional_headers else {}
        if auth:
            headers["Authorization"] = f"Bearer {auth.token}"

        return headers

    def _timeout_or_default(self, timeout_in_sec: int | None) -> int:
        return timeout_in_sec or self.default_timeout_in_sec

Functions

def verbose_http_errors() ‑> collections.abc.Generator[None, None, None]
Expand source code Browse git
@contextmanager
def verbose_http_errors() -> Generator[None, None, None]:
    try:
        yield
    except requests.HTTPError as e:
        LOGGER.error(
            dedent(
                f"""
                response status: {e.response.status_code}
                response reason: {e.response.reason}
                response content: {e.response.content}
                """
            )
        )
        raise

Classes

class DNSResolverHTTPSAdapter (common_name, ip, **kwargs)

The built-in HTTP Adapter for urllib3.

Provides a general-case interface for Requests sessions to contact HTTP and HTTPS urls by implementing the Transport Adapter interface. This class will usually be created by the :class:Session <Session> class under the covers.

:param pool_connections: The number of urllib3 connection pools to cache. :param pool_maxsize: The maximum number of connections to save in the pool. :param max_retries: The maximum number of retries each connection should attempt. Note, this applies only to failed DNS lookups, socket connections and connection timeouts, never to requests where data has made it to the server. By default, Requests does not retry failed connections. If you need granular control over the conditions under which we retry a request, import urllib3's Retry class and pass that instead. :param pool_block: Whether the connection pool should block for connections.

Usage::

import requests s = requests.Session() a = requests.adapters.HTTPAdapter(max_retries=3) s.mount('http://', a)

Expand source code Browse git
class DNSResolverHTTPSAdapter(HTTPAdapter):
    def __init__(self, common_name, ip, **kwargs):
        self.__common_name = common_name
        self.__ip = str(ip)
        super().__init__(**kwargs)

    def get_connection(self, url, proxies=None):
        redirected_url = url.replace(self.__common_name.lower(), self.__ip)
        LOGGER.info(f"original url: {url}")
        LOGGER.info(f"redirected url: {redirected_url}")
        return super().get_connection(
            redirected_url,
            proxies=proxies,
        )

    def init_poolmanager(
        self,
        connections,
        maxsize,
        block=DEFAULT_POOLBLOCK,
        **pool_kwargs,
    ):
        pool_kwargs["assert_hostname"] = self.__common_name
        pool_kwargs["server_hostname"] = self.__common_name
        super().init_poolmanager(
            connections,
            maxsize,
            block,
            **pool_kwargs,
        )

Ancestors

  • requests.adapters.HTTPAdapter
  • requests.adapters.BaseAdapter

Methods

def get_connection(self, url, proxies=None)

Returns a urllib3 connection for the given URL. This should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param url: The URL to connect to. :param proxies: (optional) A Requests-style dictionary of proxies used on this request. :rtype: urllib3.ConnectionPool

Expand source code Browse git
def get_connection(self, url, proxies=None):
    redirected_url = url.replace(self.__common_name.lower(), self.__ip)
    LOGGER.info(f"original url: {url}")
    LOGGER.info(f"redirected url: {redirected_url}")
    return super().get_connection(
        redirected_url,
        proxies=proxies,
    )
def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs)

Initializes a urllib3 PoolManager.

This method should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param connections: The number of urllib3 connection pools to cache. :param maxsize: The maximum number of connections to save in the pool. :param block: Block when no free connections are available. :param pool_kwargs: Extra keyword arguments used to initialize the Pool Manager.

Expand source code Browse git
def init_poolmanager(
    self,
    connections,
    maxsize,
    block=DEFAULT_POOLBLOCK,
    **pool_kwargs,
):
    pool_kwargs["assert_hostname"] = self.__common_name
    pool_kwargs["server_hostname"] = self.__common_name
    super().init_poolmanager(
        connections,
        maxsize,
        block,
        **pool_kwargs,
    )
class WebRequests (auth: AuthConfig | None, base_url: str, client_cert: tuple[str, str] | None = None, additional_headers: dict[str, str] | None = None, default_timeout_in_sec: int = 15, override_ip: ipaddress.IPv4Address | ipaddress.IPv6Address | None = None, verify: str | None = None)
Expand source code Browse git
class WebRequests:
    def __init__(
        self,
        auth: AuthConfig | None,
        base_url: str,
        client_cert: tuple[str, str] | None = None,
        additional_headers: dict[str, str] | None = None,
        default_timeout_in_sec: int = 15,
        override_ip: IPv4Address | IPv6Address | None = None,
        verify: str | None = None,
    ):
        self.auth = auth
        self.base_url = base_url
        self.client_cert = client_cert
        self.additional_headers = additional_headers
        self.default_timeout_in_sec = default_timeout_in_sec
        self.override_ip = override_ip
        self.verify = verify

    def session(self) -> requests.Session:
        session = requests.Session()
        if self.override_ip is not None:
            parsed_url = urlparse(self.base_url)
            session.mount(
                self.base_url.lower(),
                DNSResolverHTTPSAdapter(
                    parsed_url.netloc.split(":", 1)[0],
                    self.override_ip,
                ),
            )
        return session

    def get(
        self,
        path: str,
        timeout_in_sec: int | None = None,
    ) -> requests.Response:
        LOGGER.info(f"GET {self.base_url}{path}")

        def try_get() -> requests.Response:
            with verbose_http_errors():
                headers = self._create_headers(self.auth)
                s = self.session()
                s.mount(self.base_url, HTTPAdapter(max_retries=Retry(3)))
                response = s.get(
                    f"{self.base_url}{path}",
                    headers=headers,
                    timeout=self._timeout_or_default(timeout_in_sec),
                    cert=self.client_cert,
                    verify=self.verify,
                )
                response.raise_for_status()
                return response

        try:
            response = try_get()
        except requests.exceptions.HTTPError as e:
            if self.auth and e.response.status_code == 401:
                self.auth.refresh()
                response = try_get()
            else:
                raise

        return response

    def post(
        self,
        path: str,
        json: Any,
        timeout_in_sec: int | None = None,
    ) -> requests.Response:
        LOGGER.info(f"POST {self.base_url}{path}")

        def try_post() -> requests.Response:
            with verbose_http_errors():
                headers = self._create_headers(self.auth)
                response = self.session().post(
                    f"{self.base_url}{path}",
                    headers=headers,
                    json=json,
                    timeout=self._timeout_or_default(timeout_in_sec),
                    cert=self.client_cert,
                    verify=self.verify,
                )
                response.raise_for_status()
                return response

        try:
            response = try_post()
        except requests.exceptions.HTTPError as e:
            if self.auth and e.response.status_code == 401:
                self.auth.refresh()
                response = try_post()
            else:
                raise

        return response

    def patch(
        self,
        path: str,
        json: Any,
        timeout_in_sec: int | None = None,
    ) -> requests.Response:
        LOGGER.info(f"PATCH {self.base_url}{path}")

        def try_patch() -> requests.Response:
            with verbose_http_errors():
                headers = self._create_headers(self.auth)
                response = self.session().patch(
                    f"{self.base_url}{path}",
                    headers=headers,
                    json=json,
                    timeout=self._timeout_or_default(timeout_in_sec),
                    cert=self.client_cert,
                    verify=self.verify,
                )
                response.raise_for_status()
                return response

        try:
            response = try_patch()
        except requests.exceptions.HTTPError as e:
            if self.auth and e.response.status_code == 401:
                self.auth.refresh()
                response = try_patch()
            else:
                raise

        return response

    def delete(
        self,
        path: str,
        params: Any = None,
        timeout_in_sec: int | None = None,
    ) -> requests.Response:
        LOGGER.info(f"DELETE {self.base_url}{path}")

        def try_delete() -> requests.Response:
            with verbose_http_errors():
                headers = self._create_headers(self.auth)
                response = self.session().delete(
                    f"{self.base_url}{path}",
                    headers=headers,
                    timeout=self._timeout_or_default(timeout_in_sec),
                    cert=self.client_cert,
                    verify=self.verify,
                    **(
                        {
                            "params": params,
                        }
                        if params is not None
                        else {}
                    ),
                )
                response.raise_for_status()
                return response

        try:
            response = try_delete()
        except requests.exceptions.HTTPError as e:
            if self.auth and e.response.status_code == 401:
                self.auth.refresh()
                response = try_delete()
            else:
                raise

        return response

    def _create_headers(self, auth: AuthConfig | None) -> dict[str, Any]:
        headers = self.additional_headers.copy() if self.additional_headers else {}
        if auth:
            headers["Authorization"] = f"Bearer {auth.token}"

        return headers

    def _timeout_or_default(self, timeout_in_sec: int | None) -> int:
        return timeout_in_sec or self.default_timeout_in_sec

Methods

def delete(self, path: str, params: Any = None, timeout_in_sec: int | None = None) ‑> requests.models.Response
Expand source code Browse git
def delete(
    self,
    path: str,
    params: Any = None,
    timeout_in_sec: int | None = None,
) -> requests.Response:
    LOGGER.info(f"DELETE {self.base_url}{path}")

    def try_delete() -> requests.Response:
        with verbose_http_errors():
            headers = self._create_headers(self.auth)
            response = self.session().delete(
                f"{self.base_url}{path}",
                headers=headers,
                timeout=self._timeout_or_default(timeout_in_sec),
                cert=self.client_cert,
                verify=self.verify,
                **(
                    {
                        "params": params,
                    }
                    if params is not None
                    else {}
                ),
            )
            response.raise_for_status()
            return response

    try:
        response = try_delete()
    except requests.exceptions.HTTPError as e:
        if self.auth and e.response.status_code == 401:
            self.auth.refresh()
            response = try_delete()
        else:
            raise

    return response
def get(self, path: str, timeout_in_sec: int | None = None) ‑> requests.models.Response
Expand source code Browse git
def get(
    self,
    path: str,
    timeout_in_sec: int | None = None,
) -> requests.Response:
    LOGGER.info(f"GET {self.base_url}{path}")

    def try_get() -> requests.Response:
        with verbose_http_errors():
            headers = self._create_headers(self.auth)
            s = self.session()
            s.mount(self.base_url, HTTPAdapter(max_retries=Retry(3)))
            response = s.get(
                f"{self.base_url}{path}",
                headers=headers,
                timeout=self._timeout_or_default(timeout_in_sec),
                cert=self.client_cert,
                verify=self.verify,
            )
            response.raise_for_status()
            return response

    try:
        response = try_get()
    except requests.exceptions.HTTPError as e:
        if self.auth and e.response.status_code == 401:
            self.auth.refresh()
            response = try_get()
        else:
            raise

    return response
def patch(self, path: str, json: Any, timeout_in_sec: int | None = None) ‑> requests.models.Response
Expand source code Browse git
def patch(
    self,
    path: str,
    json: Any,
    timeout_in_sec: int | None = None,
) -> requests.Response:
    LOGGER.info(f"PATCH {self.base_url}{path}")

    def try_patch() -> requests.Response:
        with verbose_http_errors():
            headers = self._create_headers(self.auth)
            response = self.session().patch(
                f"{self.base_url}{path}",
                headers=headers,
                json=json,
                timeout=self._timeout_or_default(timeout_in_sec),
                cert=self.client_cert,
                verify=self.verify,
            )
            response.raise_for_status()
            return response

    try:
        response = try_patch()
    except requests.exceptions.HTTPError as e:
        if self.auth and e.response.status_code == 401:
            self.auth.refresh()
            response = try_patch()
        else:
            raise

    return response
def post(self, path: str, json: Any, timeout_in_sec: int | None = None) ‑> requests.models.Response
Expand source code Browse git
def post(
    self,
    path: str,
    json: Any,
    timeout_in_sec: int | None = None,
) -> requests.Response:
    LOGGER.info(f"POST {self.base_url}{path}")

    def try_post() -> requests.Response:
        with verbose_http_errors():
            headers = self._create_headers(self.auth)
            response = self.session().post(
                f"{self.base_url}{path}",
                headers=headers,
                json=json,
                timeout=self._timeout_or_default(timeout_in_sec),
                cert=self.client_cert,
                verify=self.verify,
            )
            response.raise_for_status()
            return response

    try:
        response = try_post()
    except requests.exceptions.HTTPError as e:
        if self.auth and e.response.status_code == 401:
            self.auth.refresh()
            response = try_post()
        else:
            raise

    return response
def session(self) ‑> requests.sessions.Session
Expand source code Browse git
def session(self) -> requests.Session:
    session = requests.Session()
    if self.override_ip is not None:
        parsed_url = urlparse(self.base_url)
        session.mount(
            self.base_url.lower(),
            DNSResolverHTTPSAdapter(
                parsed_url.netloc.split(":", 1)[0],
                self.override_ip,
            ),
        )
    return session