misc.python.materialize.redpanda_cloud
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10import os 11import time 12from typing import Any 13 14import requests 15 16 17def get_result(response: requests.Response) -> dict[str, Any]: 18 if response.status_code not in (200, 201, 202): 19 raise ValueError( 20 f"Redpanda API call failed: {response.status_code} {response.text}" 21 ) 22 result = response.json() 23 print(result) 24 return result 25 26 27class RedpandaCluster: 28 def __init__(self, token: str, dataplane_api_url: str) -> None: 29 self.token = token 30 self.dataplane_api_url = dataplane_api_url 31 32 def headers(self) -> dict[str, str]: 33 return {"Authorization": f"Bearer {self.token}"} 34 35 def create(self, object: str, content: dict[str, Any]) -> dict[str, Any]: 36 return get_result( 37 requests.post( 38 f"{self.dataplane_api_url}/v1alpha1/{object}", 39 json=content, 40 headers=self.headers(), 41 ) 42 ) 43 44 45class RedpandaCloud: 46 def __init__(self) -> None: 47 client_id = os.environ["REDPANDA_CLOUD_CLIENT_ID"] 48 client_secret = os.environ["REDPANDA_CLOUD_CLIENT_SECRET"] 49 50 result = get_result( 51 requests.post( 52 "https://auth.prd.cloud.redpanda.com/oauth/token", 53 json={ 54 "client_id": client_id, 55 "client_secret": client_secret, 56 "audience": "cloudv2-production.redpanda.cloud", 57 "grant_type": "client_credentials", 58 }, 59 ) 60 ) 61 # Can't finish our test otherwise 62 assert result["expires_in"] >= 3600, result 63 self.token = result["access_token"] 64 self.controlplane_api_url = "https://api.redpanda.com" 65 66 def headers(self) -> dict[str, str]: 67 return {"Authorization": f"Bearer {self.token}"} 68 69 def wait(self, result: dict[str, Any]) -> dict[str, Any]: 70 operation_id = result["operation"]["id"] 71 while True: 72 time.sleep(10) 73 result = get_result( 74 requests.get( 75 f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}", 76 headers=self.headers(), 77 ) 78 ) 79 if result["operation"]["state"] == "STATE_COMPLETED": 80 return result["operation"] 81 if result["operation"]["state"] == "STATE_FAILED": 82 raise ValueError(result) 83 if result["operation"]["state"] != "STATE_IN_PROGRESS": 84 raise ValueError(result) 85 86 def create(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]: 87 return get_result( 88 requests.post( 89 f"{self.controlplane_api_url}/v1beta2/{object}", 90 json=content, 91 headers=self.headers(), 92 ) 93 ) 94 95 def patch(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]: 96 return get_result( 97 requests.patch( 98 f"{self.controlplane_api_url}/v1beta2/{object}", 99 json=content, 100 headers=self.headers(), 101 ) 102 ) 103 104 def get(self, object: str) -> dict[str, Any]: 105 return get_result( 106 requests.get( 107 f"{self.controlplane_api_url}/v1beta2/{object}", 108 headers=self.headers(), 109 ) 110 ) 111 112 def delete(self, object: str, id: str) -> dict[str, Any]: 113 return get_result( 114 requests.delete( 115 f"{self.controlplane_api_url}/v1beta2/{object}/{id}", 116 headers=self.headers(), 117 ) 118 ) 119 120 def get_cluster(self, cluster_info: dict[str, Any]) -> RedpandaCluster: 121 return RedpandaCluster(self.token, cluster_info["dataplane_api"]["url"])
def
get_result(response: requests.models.Response) -> dict[str, typing.Any]:
class
RedpandaCluster:
28class RedpandaCluster: 29 def __init__(self, token: str, dataplane_api_url: str) -> None: 30 self.token = token 31 self.dataplane_api_url = dataplane_api_url 32 33 def headers(self) -> dict[str, str]: 34 return {"Authorization": f"Bearer {self.token}"} 35 36 def create(self, object: str, content: dict[str, Any]) -> dict[str, Any]: 37 return get_result( 38 requests.post( 39 f"{self.dataplane_api_url}/v1alpha1/{object}", 40 json=content, 41 headers=self.headers(), 42 ) 43 )
class
RedpandaCloud:
46class RedpandaCloud: 47 def __init__(self) -> None: 48 client_id = os.environ["REDPANDA_CLOUD_CLIENT_ID"] 49 client_secret = os.environ["REDPANDA_CLOUD_CLIENT_SECRET"] 50 51 result = get_result( 52 requests.post( 53 "https://auth.prd.cloud.redpanda.com/oauth/token", 54 json={ 55 "client_id": client_id, 56 "client_secret": client_secret, 57 "audience": "cloudv2-production.redpanda.cloud", 58 "grant_type": "client_credentials", 59 }, 60 ) 61 ) 62 # Can't finish our test otherwise 63 assert result["expires_in"] >= 3600, result 64 self.token = result["access_token"] 65 self.controlplane_api_url = "https://api.redpanda.com" 66 67 def headers(self) -> dict[str, str]: 68 return {"Authorization": f"Bearer {self.token}"} 69 70 def wait(self, result: dict[str, Any]) -> dict[str, Any]: 71 operation_id = result["operation"]["id"] 72 while True: 73 time.sleep(10) 74 result = get_result( 75 requests.get( 76 f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}", 77 headers=self.headers(), 78 ) 79 ) 80 if result["operation"]["state"] == "STATE_COMPLETED": 81 return result["operation"] 82 if result["operation"]["state"] == "STATE_FAILED": 83 raise ValueError(result) 84 if result["operation"]["state"] != "STATE_IN_PROGRESS": 85 raise ValueError(result) 86 87 def create(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]: 88 return get_result( 89 requests.post( 90 f"{self.controlplane_api_url}/v1beta2/{object}", 91 json=content, 92 headers=self.headers(), 93 ) 94 ) 95 96 def patch(self, object: str, content: dict[str, Any] | None) -> dict[str, Any]: 97 return get_result( 98 requests.patch( 99 f"{self.controlplane_api_url}/v1beta2/{object}", 100 json=content, 101 headers=self.headers(), 102 ) 103 ) 104 105 def get(self, object: str) -> dict[str, Any]: 106 return get_result( 107 requests.get( 108 f"{self.controlplane_api_url}/v1beta2/{object}", 109 headers=self.headers(), 110 ) 111 ) 112 113 def delete(self, object: str, id: str) -> dict[str, Any]: 114 return get_result( 115 requests.delete( 116 f"{self.controlplane_api_url}/v1beta2/{object}/{id}", 117 headers=self.headers(), 118 ) 119 ) 120 121 def get_cluster(self, cluster_info: dict[str, Any]) -> RedpandaCluster: 122 return RedpandaCluster(self.token, cluster_info["dataplane_api"]["url"])
def
wait(self, result: dict[str, typing.Any]) -> dict[str, typing.Any]:
70 def wait(self, result: dict[str, Any]) -> dict[str, Any]: 71 operation_id = result["operation"]["id"] 72 while True: 73 time.sleep(10) 74 result = get_result( 75 requests.get( 76 f"{self.controlplane_api_url}/v1beta2/operations/{operation_id}", 77 headers=self.headers(), 78 ) 79 ) 80 if result["operation"]["state"] == "STATE_COMPLETED": 81 return result["operation"] 82 if result["operation"]["state"] == "STATE_FAILED": 83 raise ValueError(result) 84 if result["operation"]["state"] != "STATE_IN_PROGRESS": 85 raise ValueError(result)