misc.python.materialize.redpanda_cloud

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10import os
 11import time
 12from typing import Any
 13
 14import requests
 15
 16
 17def get_result(response: requests.Response) -> dict[str, Any]:
 18    if response.status_code not in (200, 201, 202):
 19        raise ValueError(
 20            f"Redpanda API call failed: {response.status_code} {response.text}"
 21        )
 22    result = response.json()
 23    print(result)
 24    return result
 25
 26
 27class RedpandaCluster:
 28    def __init__(self, token: str, dataplane_api_url: str) -> None:
 29        self.token = token
 30        self.dataplane_api_url = dataplane_api_url
 31
 32    def headers(self) -> dict[str, str]:
 33        return {"Authorization": f"Bearer {self.token}"}
 34
 35    def create(self, object: str, content: dict[str, Any]) -> dict[str, Any]:
 36        return get_result(
 37            requests.post(
 38                f"{self.dataplane_api_url}/v1alpha1/{object}",
 39                json=content,
 40                headers=self.headers(),
 41            )
 42        )
 43
 44
 45class RedpandaCloud:
 46    def __init__(self) -> None:
 47        client_id = os.environ["REDPANDA_CLOUD_CLIENT_ID"]
 48        client_secret = os.environ["REDPANDA_CLOUD_CLIENT_SECRET"]
 49
 50        result = get_result(
 51            requests.post(
 52                "https://auth.prd.cloud.redpanda.com/oauth/token",
 53                json={
 54                    "client_id": client_id,
 55                    "client_secret": client_secret,
 56                    "audience": "cloudv2-production.redpanda.cloud",
 57                    "grant_type": "client_credentials",
 58                },
 59            )
 60        )
 61        # Can't finish our test otherwise
 62        assert result["expires_in"] >= 3600, result
 63        self.token = result["access_token"]
 64        self.controlplane_api_url = "https://api.redpanda.com"
 65
 66    def headers(self) -> dict[str, str]:
 67        return {"Authorization": f"Bearer {self.token}"}
 68
 69    def wait(self, result: dict[str, Any]) -> dict[str, Any]:
 70        operation_id = result["operation"]["id"]
 71        while True:
 72            time.sleep(10)
 73            result = get_result(
 74                requests.get(
 75                    f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}",
 76                    headers=self.headers(),
 77                )
 78            )
 79            if result["operation"]["state"] == "STATE_COMPLETED":
 80                return result["operation"]
 81            if result["operation"]["state"] == "STATE_FAILED":
 82                raise ValueError(result)
 83            if result["operation"]["state"] != "STATE_IN_PROGRESS":
 84                raise ValueError(result)
 85
 86    def create(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
 87        return get_result(
 88            requests.post(
 89                f"{self.controlplane_api_url}/v1beta2/{object}",
 90                json=content,
 91                headers=self.headers(),
 92            )
 93        )
 94
 95    def patch(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
 96        return get_result(
 97            requests.patch(
 98                f"{self.controlplane_api_url}/v1beta2/{object}",
 99                json=content,
100                headers=self.headers(),
101            )
102        )
103
104    def get(self, object: str) -> dict[str, Any]:
105        return get_result(
106            requests.get(
107                f"{self.controlplane_api_url}/v1beta2/{object}",
108                headers=self.headers(),
109            )
110        )
111
112    def delete(self, object: str, id: str) -> dict[str, Any]:
113        return get_result(
114            requests.delete(
115                f"{self.controlplane_api_url}/v1beta2/{object}/{id}",
116                headers=self.headers(),
117            )
118        )
119
120    def get_cluster(self, cluster_info: dict[str, Any]) -> RedpandaCluster:
121        return RedpandaCluster(self.token, cluster_info["dataplane_api"]["url"])
def get_result(response: requests.models.Response) -> dict[str, typing.Any]:
18def get_result(response: requests.Response) -> dict[str, Any]:
19    if response.status_code not in (200, 201, 202):
20        raise ValueError(
21            f"Redpanda API call failed: {response.status_code} {response.text}"
22        )
23    result = response.json()
24    print(result)
25    return result
class RedpandaCluster:
28class RedpandaCluster:
29    def __init__(self, token: str, dataplane_api_url: str) -> None:
30        self.token = token
31        self.dataplane_api_url = dataplane_api_url
32
33    def headers(self) -> dict[str, str]:
34        return {"Authorization": f"Bearer {self.token}"}
35
36    def create(self, object: str, content: dict[str, Any]) -> dict[str, Any]:
37        return get_result(
38            requests.post(
39                f"{self.dataplane_api_url}/v1alpha1/{object}",
40                json=content,
41                headers=self.headers(),
42            )
43        )
RedpandaCluster(token: str, dataplane_api_url: str)
29    def __init__(self, token: str, dataplane_api_url: str) -> None:
30        self.token = token
31        self.dataplane_api_url = dataplane_api_url
token
dataplane_api_url
def headers(self) -> dict[str, str]:
33    def headers(self) -> dict[str, str]:
34        return {"Authorization": f"Bearer {self.token}"}
def create( self, object: str, content: dict[str, typing.Any]) -> dict[str, typing.Any]:
36    def create(self, object: str, content: dict[str, Any]) -> dict[str, Any]:
37        return get_result(
38            requests.post(
39                f"{self.dataplane_api_url}/v1alpha1/{object}",
40                json=content,
41                headers=self.headers(),
42            )
43        )
class RedpandaCloud:
 46class RedpandaCloud:
 47    def __init__(self) -> None:
 48        client_id = os.environ["REDPANDA_CLOUD_CLIENT_ID"]
 49        client_secret = os.environ["REDPANDA_CLOUD_CLIENT_SECRET"]
 50
 51        result = get_result(
 52            requests.post(
 53                "https://auth.prd.cloud.redpanda.com/oauth/token",
 54                json={
 55                    "client_id": client_id,
 56                    "client_secret": client_secret,
 57                    "audience": "cloudv2-production.redpanda.cloud",
 58                    "grant_type": "client_credentials",
 59                },
 60            )
 61        )
 62        # Can't finish our test otherwise
 63        assert result["expires_in"] >= 3600, result
 64        self.token = result["access_token"]
 65        self.controlplane_api_url = "https://api.redpanda.com"
 66
 67    def headers(self) -> dict[str, str]:
 68        return {"Authorization": f"Bearer {self.token}"}
 69
 70    def wait(self, result: dict[str, Any]) -> dict[str, Any]:
 71        operation_id = result["operation"]["id"]
 72        while True:
 73            time.sleep(10)
 74            result = get_result(
 75                requests.get(
 76                    f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}",
 77                    headers=self.headers(),
 78                )
 79            )
 80            if result["operation"]["state"] == "STATE_COMPLETED":
 81                return result["operation"]
 82            if result["operation"]["state"] == "STATE_FAILED":
 83                raise ValueError(result)
 84            if result["operation"]["state"] != "STATE_IN_PROGRESS":
 85                raise ValueError(result)
 86
 87    def create(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
 88        return get_result(
 89            requests.post(
 90                f"{self.controlplane_api_url}/v1beta2/{object}",
 91                json=content,
 92                headers=self.headers(),
 93            )
 94        )
 95
 96    def patch(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
 97        return get_result(
 98            requests.patch(
 99                f"{self.controlplane_api_url}/v1beta2/{object}",
100                json=content,
101                headers=self.headers(),
102            )
103        )
104
105    def get(self, object: str) -> dict[str, Any]:
106        return get_result(
107            requests.get(
108                f"{self.controlplane_api_url}/v1beta2/{object}",
109                headers=self.headers(),
110            )
111        )
112
113    def delete(self, object: str, id: str) -> dict[str, Any]:
114        return get_result(
115            requests.delete(
116                f"{self.controlplane_api_url}/v1beta2/{object}/{id}",
117                headers=self.headers(),
118            )
119        )
120
121    def get_cluster(self, cluster_info: dict[str, Any]) -> RedpandaCluster:
122        return RedpandaCluster(self.token, cluster_info["dataplane_api"]["url"])
token
controlplane_api_url
def headers(self) -> dict[str, str]:
67    def headers(self) -> dict[str, str]:
68        return {"Authorization": f"Bearer {self.token}"}
def wait(self, result: dict[str, typing.Any]) -> dict[str, typing.Any]:
70    def wait(self, result: dict[str, Any]) -> dict[str, Any]:
71        operation_id = result["operation"]["id"]
72        while True:
73            time.sleep(10)
74            result = get_result(
75                requests.get(
76                    f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}",
77                    headers=self.headers(),
78                )
79            )
80            if result["operation"]["state"] == "STATE_COMPLETED":
81                return result["operation"]
82            if result["operation"]["state"] == "STATE_FAILED":
83                raise ValueError(result)
84            if result["operation"]["state"] != "STATE_IN_PROGRESS":
85                raise ValueError(result)
def create( self, object: str, content: dict[str, typing.Any] | None) -> dict[str, typing.Any]:
87    def create(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
88        return get_result(
89            requests.post(
90                f"{self.controlplane_api_url}/v1beta2/{object}",
91                json=content,
92                headers=self.headers(),
93            )
94        )
def patch( self, object: str, content: dict[str, typing.Any] | None) -> dict[str, typing.Any]:
 96    def patch(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]:
 97        return get_result(
 98            requests.patch(
 99                f"{self.controlplane_api_url}/v1beta2/{object}",
100                json=content,
101                headers=self.headers(),
102            )
103        )
def get(self, object: str) -> dict[str, typing.Any]:
105    def get(self, object: str) -> dict[str, Any]:
106        return get_result(
107            requests.get(
108                f"{self.controlplane_api_url}/v1beta2/{object}",
109                headers=self.headers(),
110            )
111        )
def delete(self, object: str, id: str) -> dict[str, typing.Any]:
113    def delete(self, object: str, id: str) -> dict[str, Any]:
114        return get_result(
115            requests.delete(
116                f"{self.controlplane_api_url}/v1beta2/{object}/{id}",
117                headers=self.headers(),
118            )
119        )
def get_cluster( self, cluster_info: dict[str, typing.Any]) -> RedpandaCluster:
121    def get_cluster(self, cluster_info: dict[str, Any]) -> RedpandaCluster:
122        return RedpandaCluster(self.token, cluster_info["dataplane_api"]["url"])