Module materialize.cloudtest.util.web_request
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import logging
from collections.abc import Generator
from contextlib import contextmanager
from ipaddress import IPv4Address, IPv6Address
from textwrap import dedent
from typing import Any
from urllib.parse import urlparse
import requests
from requests.adapters import DEFAULT_POOLBLOCK, HTTPAdapter, Retry
from materialize.cloudtest.util.authentication import AuthConfig
LOGGER = logging.getLogger(__name__)
@contextmanager
def verbose_http_errors() -> Generator[None, None, None]:
try:
yield
except requests.HTTPError as e:
LOGGER.error(
dedent(
f"""
response status: {e.response.status_code}
response reason: {e.response.reason}
response content: {e.response.content}
"""
)
)
raise
class DNSResolverHTTPSAdapter(HTTPAdapter):
def __init__(self, common_name, ip, **kwargs):
self.__common_name = common_name
self.__ip = str(ip)
super().__init__(**kwargs)
def get_connection(self, url, proxies=None):
redirected_url = url.replace(self.__common_name.lower(), self.__ip)
LOGGER.info(f"original url: {url}")
LOGGER.info(f"redirected url: {redirected_url}")
return super().get_connection(
redirected_url,
proxies=proxies,
)
def init_poolmanager(
self,
connections,
maxsize,
block=DEFAULT_POOLBLOCK,
**pool_kwargs,
):
pool_kwargs["assert_hostname"] = self.__common_name
pool_kwargs["server_hostname"] = self.__common_name
super().init_poolmanager(
connections,
maxsize,
block,
**pool_kwargs,
)
class WebRequests:
def __init__(
self,
auth: AuthConfig | None,
base_url: str,
client_cert: tuple[str, str] | None = None,
additional_headers: dict[str, str] | None = None,
default_timeout_in_sec: int = 15,
override_ip: IPv4Address | IPv6Address | None = None,
verify: str | None = None,
):
self.auth = auth
self.base_url = base_url
self.client_cert = client_cert
self.additional_headers = additional_headers
self.default_timeout_in_sec = default_timeout_in_sec
self.override_ip = override_ip
self.verify = verify
def session(self) -> requests.Session:
session = requests.Session()
if self.override_ip is not None:
parsed_url = urlparse(self.base_url)
session.mount(
self.base_url.lower(),
DNSResolverHTTPSAdapter(
parsed_url.netloc.split(":", 1)[0],
self.override_ip,
),
)
return session
def get(
self,
path: str,
timeout_in_sec: int | None = None,
) -> requests.Response:
LOGGER.info(f"GET {self.base_url}{path}")
def try_get() -> requests.Response:
with verbose_http_errors():
headers = self._create_headers(self.auth)
s = self.session()
s.mount(self.base_url, HTTPAdapter(max_retries=Retry(3)))
response = s.get(
f"{self.base_url}{path}",
headers=headers,
timeout=self._timeout_or_default(timeout_in_sec),
cert=self.client_cert,
verify=self.verify,
)
response.raise_for_status()
return response
try:
response = try_get()
except requests.exceptions.HTTPError as e:
if self.auth and e.response.status_code == 401:
self.auth.refresh()
response = try_get()
else:
raise
return response
def post(
self,
path: str,
json: Any,
timeout_in_sec: int | None = None,
) -> requests.Response:
LOGGER.info(f"POST {self.base_url}{path}")
def try_post() -> requests.Response:
with verbose_http_errors():
headers = self._create_headers(self.auth)
response = self.session().post(
f"{self.base_url}{path}",
headers=headers,
json=json,
timeout=self._timeout_or_default(timeout_in_sec),
cert=self.client_cert,
verify=self.verify,
)
response.raise_for_status()
return response
try:
response = try_post()
except requests.exceptions.HTTPError as e:
if self.auth and e.response.status_code == 401:
self.auth.refresh()
response = try_post()
else:
raise
return response
def patch(
self,
path: str,
json: Any,
timeout_in_sec: int | None = None,
) -> requests.Response:
LOGGER.info(f"PATCH {self.base_url}{path}")
def try_patch() -> requests.Response:
with verbose_http_errors():
headers = self._create_headers(self.auth)
response = self.session().patch(
f"{self.base_url}{path}",
headers=headers,
json=json,
timeout=self._timeout_or_default(timeout_in_sec),
cert=self.client_cert,
verify=self.verify,
)
response.raise_for_status()
return response
try:
response = try_patch()
except requests.exceptions.HTTPError as e:
if self.auth and e.response.status_code == 401:
self.auth.refresh()
response = try_patch()
else:
raise
return response
def delete(
self,
path: str,
params: Any = None,
timeout_in_sec: int | None = None,
) -> requests.Response:
LOGGER.info(f"DELETE {self.base_url}{path}")
def try_delete() -> requests.Response:
with verbose_http_errors():
headers = self._create_headers(self.auth)
response = self.session().delete(
f"{self.base_url}{path}",
headers=headers,
timeout=self._timeout_or_default(timeout_in_sec),
cert=self.client_cert,
verify=self.verify,
**(
{
"params": params,
}
if params is not None
else {}
),
)
response.raise_for_status()
return response
try:
response = try_delete()
except requests.exceptions.HTTPError as e:
if self.auth and e.response.status_code == 401:
self.auth.refresh()
response = try_delete()
else:
raise
return response
def _create_headers(self, auth: AuthConfig | None) -> dict[str, Any]:
headers = self.additional_headers.copy() if self.additional_headers else {}
if auth:
headers["Authorization"] = f"Bearer {auth.token}"
return headers
def _timeout_or_default(self, timeout_in_sec: int | None) -> int:
return timeout_in_sec or self.default_timeout_in_sec
Functions
def verbose_http_errors() ‑> collections.abc.Generator[None, None, None]
-
Expand source code Browse git
@contextmanager def verbose_http_errors() -> Generator[None, None, None]: try: yield except requests.HTTPError as e: LOGGER.error( dedent( f""" response status: {e.response.status_code} response reason: {e.response.reason} response content: {e.response.content} """ ) ) raise
Classes
class DNSResolverHTTPSAdapter (common_name, ip, **kwargs)
-
The built-in HTTP Adapter for urllib3.
Provides a general-case interface for Requests sessions to contact HTTP and HTTPS urls by implementing the Transport Adapter interface. This class will usually be created by the :class:
Session <Session>
class under the covers.:param pool_connections: The number of urllib3 connection pools to cache. :param pool_maxsize: The maximum number of connections to save in the pool. :param max_retries: The maximum number of retries each connection should attempt. Note, this applies only to failed DNS lookups, socket connections and connection timeouts, never to requests where data has made it to the server. By default, Requests does not retry failed connections. If you need granular control over the conditions under which we retry a request, import urllib3's
Retry
class and pass that instead. :param pool_block: Whether the connection pool should block for connections.Usage::
import requests s = requests.Session() a = requests.adapters.HTTPAdapter(max_retries=3) s.mount('http://', a)
Expand source code Browse git
class DNSResolverHTTPSAdapter(HTTPAdapter): def __init__(self, common_name, ip, **kwargs): self.__common_name = common_name self.__ip = str(ip) super().__init__(**kwargs) def get_connection(self, url, proxies=None): redirected_url = url.replace(self.__common_name.lower(), self.__ip) LOGGER.info(f"original url: {url}") LOGGER.info(f"redirected url: {redirected_url}") return super().get_connection( redirected_url, proxies=proxies, ) def init_poolmanager( self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs, ): pool_kwargs["assert_hostname"] = self.__common_name pool_kwargs["server_hostname"] = self.__common_name super().init_poolmanager( connections, maxsize, block, **pool_kwargs, )
Ancestors
- requests.adapters.HTTPAdapter
- requests.adapters.BaseAdapter
Methods
def get_connection(self, url, proxies=None)
-
Returns a urllib3 connection for the given URL. This should not be called from user code, and is only exposed for use when subclassing the :class:
HTTPAdapter <requests.adapters.HTTPAdapter>
.:param url: The URL to connect to. :param proxies: (optional) A Requests-style dictionary of proxies used on this request. :rtype: urllib3.ConnectionPool
Expand source code Browse git
def get_connection(self, url, proxies=None): redirected_url = url.replace(self.__common_name.lower(), self.__ip) LOGGER.info(f"original url: {url}") LOGGER.info(f"redirected url: {redirected_url}") return super().get_connection( redirected_url, proxies=proxies, )
def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs)
-
Initializes a urllib3 PoolManager.
This method should not be called from user code, and is only exposed for use when subclassing the :class:
HTTPAdapter <requests.adapters.HTTPAdapter>
.:param connections: The number of urllib3 connection pools to cache. :param maxsize: The maximum number of connections to save in the pool. :param block: Block when no free connections are available. :param pool_kwargs: Extra keyword arguments used to initialize the Pool Manager.
Expand source code Browse git
def init_poolmanager( self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs, ): pool_kwargs["assert_hostname"] = self.__common_name pool_kwargs["server_hostname"] = self.__common_name super().init_poolmanager( connections, maxsize, block, **pool_kwargs, )
class WebRequests (auth: AuthConfig | None, base_url: str, client_cert: tuple[str, str] | None = None, additional_headers: dict[str, str] | None = None, default_timeout_in_sec: int = 15, override_ip: ipaddress.IPv4Address | ipaddress.IPv6Address | None = None, verify: str | None = None)
-
Expand source code Browse git
class WebRequests: def __init__( self, auth: AuthConfig | None, base_url: str, client_cert: tuple[str, str] | None = None, additional_headers: dict[str, str] | None = None, default_timeout_in_sec: int = 15, override_ip: IPv4Address | IPv6Address | None = None, verify: str | None = None, ): self.auth = auth self.base_url = base_url self.client_cert = client_cert self.additional_headers = additional_headers self.default_timeout_in_sec = default_timeout_in_sec self.override_ip = override_ip self.verify = verify def session(self) -> requests.Session: session = requests.Session() if self.override_ip is not None: parsed_url = urlparse(self.base_url) session.mount( self.base_url.lower(), DNSResolverHTTPSAdapter( parsed_url.netloc.split(":", 1)[0], self.override_ip, ), ) return session def get( self, path: str, timeout_in_sec: int | None = None, ) -> requests.Response: LOGGER.info(f"GET {self.base_url}{path}") def try_get() -> requests.Response: with verbose_http_errors(): headers = self._create_headers(self.auth) s = self.session() s.mount(self.base_url, HTTPAdapter(max_retries=Retry(3))) response = s.get( f"{self.base_url}{path}", headers=headers, timeout=self._timeout_or_default(timeout_in_sec), cert=self.client_cert, verify=self.verify, ) response.raise_for_status() return response try: response = try_get() except requests.exceptions.HTTPError as e: if self.auth and e.response.status_code == 401: self.auth.refresh() response = try_get() else: raise return response def post( self, path: str, json: Any, timeout_in_sec: int | None = None, ) -> requests.Response: LOGGER.info(f"POST {self.base_url}{path}") def try_post() -> requests.Response: with verbose_http_errors(): headers = self._create_headers(self.auth) response = self.session().post( f"{self.base_url}{path}", headers=headers, json=json, timeout=self._timeout_or_default(timeout_in_sec), cert=self.client_cert, verify=self.verify, ) response.raise_for_status() return response try: response = try_post() except requests.exceptions.HTTPError as e: if self.auth and e.response.status_code == 401: self.auth.refresh() response = try_post() else: raise return response def patch( self, path: str, json: Any, timeout_in_sec: int | None = None, ) -> requests.Response: LOGGER.info(f"PATCH {self.base_url}{path}") def try_patch() -> requests.Response: with verbose_http_errors(): headers = self._create_headers(self.auth) response = self.session().patch( f"{self.base_url}{path}", headers=headers, json=json, timeout=self._timeout_or_default(timeout_in_sec), cert=self.client_cert, verify=self.verify, ) response.raise_for_status() return response try: response = try_patch() except requests.exceptions.HTTPError as e: if self.auth and e.response.status_code == 401: self.auth.refresh() response = try_patch() else: raise return response def delete( self, path: str, params: Any = None, timeout_in_sec: int | None = None, ) -> requests.Response: LOGGER.info(f"DELETE {self.base_url}{path}") def try_delete() -> requests.Response: with verbose_http_errors(): headers = self._create_headers(self.auth) response = self.session().delete( f"{self.base_url}{path}", headers=headers, timeout=self._timeout_or_default(timeout_in_sec), cert=self.client_cert, verify=self.verify, **( { "params": params, } if params is not None else {} ), ) response.raise_for_status() return response try: response = try_delete() except requests.exceptions.HTTPError as e: if self.auth and e.response.status_code == 401: self.auth.refresh() response = try_delete() else: raise return response def _create_headers(self, auth: AuthConfig | None) -> dict[str, Any]: headers = self.additional_headers.copy() if self.additional_headers else {} if auth: headers["Authorization"] = f"Bearer {auth.token}" return headers def _timeout_or_default(self, timeout_in_sec: int | None) -> int: return timeout_in_sec or self.default_timeout_in_sec
Methods
def delete(self, path: str, params: Any = None, timeout_in_sec: int | None = None) ‑> requests.models.Response
-
Expand source code Browse git
def delete( self, path: str, params: Any = None, timeout_in_sec: int | None = None, ) -> requests.Response: LOGGER.info(f"DELETE {self.base_url}{path}") def try_delete() -> requests.Response: with verbose_http_errors(): headers = self._create_headers(self.auth) response = self.session().delete( f"{self.base_url}{path}", headers=headers, timeout=self._timeout_or_default(timeout_in_sec), cert=self.client_cert, verify=self.verify, **( { "params": params, } if params is not None else {} ), ) response.raise_for_status() return response try: response = try_delete() except requests.exceptions.HTTPError as e: if self.auth and e.response.status_code == 401: self.auth.refresh() response = try_delete() else: raise return response
def get(self, path: str, timeout_in_sec: int | None = None) ‑> requests.models.Response
-
Expand source code Browse git
def get( self, path: str, timeout_in_sec: int | None = None, ) -> requests.Response: LOGGER.info(f"GET {self.base_url}{path}") def try_get() -> requests.Response: with verbose_http_errors(): headers = self._create_headers(self.auth) s = self.session() s.mount(self.base_url, HTTPAdapter(max_retries=Retry(3))) response = s.get( f"{self.base_url}{path}", headers=headers, timeout=self._timeout_or_default(timeout_in_sec), cert=self.client_cert, verify=self.verify, ) response.raise_for_status() return response try: response = try_get() except requests.exceptions.HTTPError as e: if self.auth and e.response.status_code == 401: self.auth.refresh() response = try_get() else: raise return response
def patch(self, path: str, json: Any, timeout_in_sec: int | None = None) ‑> requests.models.Response
-
Expand source code Browse git
def patch( self, path: str, json: Any, timeout_in_sec: int | None = None, ) -> requests.Response: LOGGER.info(f"PATCH {self.base_url}{path}") def try_patch() -> requests.Response: with verbose_http_errors(): headers = self._create_headers(self.auth) response = self.session().patch( f"{self.base_url}{path}", headers=headers, json=json, timeout=self._timeout_or_default(timeout_in_sec), cert=self.client_cert, verify=self.verify, ) response.raise_for_status() return response try: response = try_patch() except requests.exceptions.HTTPError as e: if self.auth and e.response.status_code == 401: self.auth.refresh() response = try_patch() else: raise return response
def post(self, path: str, json: Any, timeout_in_sec: int | None = None) ‑> requests.models.Response
-
Expand source code Browse git
def post( self, path: str, json: Any, timeout_in_sec: int | None = None, ) -> requests.Response: LOGGER.info(f"POST {self.base_url}{path}") def try_post() -> requests.Response: with verbose_http_errors(): headers = self._create_headers(self.auth) response = self.session().post( f"{self.base_url}{path}", headers=headers, json=json, timeout=self._timeout_or_default(timeout_in_sec), cert=self.client_cert, verify=self.verify, ) response.raise_for_status() return response try: response = try_post() except requests.exceptions.HTTPError as e: if self.auth and e.response.status_code == 401: self.auth.refresh() response = try_post() else: raise return response
def session(self) ‑> requests.sessions.Session
-
Expand source code Browse git
def session(self) -> requests.Session: session = requests.Session() if self.override_ip is not None: parsed_url = urlparse(self.base_url) session.mount( self.base_url.lower(), DNSResolverHTTPSAdapter( parsed_url.netloc.split(":", 1)[0], self.override_ip, ), ) return session