1use std::sync::Arc;
18use std::time::Duration;
19
20use askama::Template;
21use axum::Extension;
22use axum::body::Body;
23use axum::extract::Path;
24use axum::http::{Request, StatusCode};
25use axum::response::{IntoResponse, Response};
26use http::HeaderValue;
27use hyper::Uri;
28use hyper_util::rt::TokioIo;
29use mz_controller::ReplicaHttpLocator;
30use mz_controller_types::{ClusterId, ReplicaId};
31use mz_ore::netio::{SocketAddrType, Stream};
32
33use crate::http::AuthedClient;
34
35const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
37const REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
39
40pub struct ClusterProxyConfig {
42 pub(crate) locator: Arc<ReplicaHttpLocator>,
44}
45
46impl ClusterProxyConfig {
47 pub fn new(locator: Arc<ReplicaHttpLocator>) -> Self {
49 Self { locator }
50 }
51}
52
53pub(crate) async fn handle_cluster_proxy_root(
59 Path((cluster_id, replica_id, process)): Path<(String, String, usize)>,
60 config: Extension<Arc<ClusterProxyConfig>>,
61 req: Request<Body>,
62) -> Result<Response, (StatusCode, String)> {
63 handle_cluster_proxy_inner(cluster_id, replica_id, process, "", config, req).await
64}
65
66pub(crate) async fn handle_cluster_proxy(
74 Path((cluster_id, replica_id, process, path)): Path<(String, String, usize, String)>,
75 config: Extension<Arc<ClusterProxyConfig>>,
76 req: Request<Body>,
77) -> Result<Response, (StatusCode, String)> {
78 handle_cluster_proxy_inner(cluster_id, replica_id, process, &path, config, req).await
79}
80
81async fn handle_cluster_proxy_inner(
82 cluster_id: String,
83 replica_id: String,
84 process: usize,
85 path: &str,
86 config: Extension<Arc<ClusterProxyConfig>>,
87 mut req: Request<Body>,
88) -> Result<Response, (StatusCode, String)> {
89 let cluster_id: ClusterId = cluster_id.parse().map_err(|e| {
91 (
92 StatusCode::BAD_REQUEST,
93 format!("Invalid cluster_id '{cluster_id}': {e}"),
94 )
95 })?;
96
97 let replica_id: ReplicaId = replica_id.parse().map_err(|e| {
99 (
100 StatusCode::BAD_REQUEST,
101 format!("Invalid replica_id '{replica_id}': {e}"),
102 )
103 })?;
104
105 let http_addr = config
107 .locator
108 .get_http_addr(cluster_id, replica_id, process)
109 .ok_or_else(|| {
110 (
111 StatusCode::NOT_FOUND,
112 format!(
113 "No HTTP address found for cluster {cluster_id}, replica {replica_id}, process {process}"
114 ),
115 )
116 })?;
117
118 let path_query = if let Some(query) = req.uri().query() {
120 format!("/{path}?{query}")
121 } else {
122 format!("/{path}")
123 };
124
125 rewrite_request_for_replica(
126 &mut req,
127 &http_addr,
128 cluster_id,
129 replica_id,
130 process,
131 &path_query,
132 )?;
133
134 proxy_replica_request(&http_addr, req).await
135}
136
137pub(crate) fn rewrite_request_for_replica(
139 req: &mut Request<Body>,
140 http_addr: &str,
141 cluster_id: ClusterId,
142 replica_id: ReplicaId,
143 process: usize,
144 path_query: &str,
145) -> Result<(), (StatusCode, String)> {
146 let authority = match SocketAddrType::guess(http_addr) {
147 SocketAddrType::Unix => format!("cluster_{cluster_id}_replica_{replica_id}_{process}"),
149 SocketAddrType::Turmoil => http_addr.trim_start_matches("turmoil:").to_owned(),
150 SocketAddrType::Inet => http_addr.to_owned(),
151 };
152 let uri = Uri::try_from(format!("http://{authority}{path_query}")).map_err(|e| {
153 (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 format!("Invalid URI 'http://{authority}{path_query}': {e}"),
156 )
157 })?;
158
159 *req.uri_mut() = uri.clone();
161
162 if let Some(host) = uri.host() {
164 req.headers_mut().insert(
165 http::header::HOST,
166 HeaderValue::from_str(host).map_err(|e| {
167 (
168 StatusCode::INTERNAL_SERVER_ERROR,
169 format!("Invalid host header '{host}': {e}"),
170 )
171 })?,
172 );
173 }
174
175 req.headers_mut()
178 .insert(http::header::CONNECTION, HeaderValue::from_static("close"));
179
180 Ok(())
181}
182
183pub(crate) async fn proxy_replica_request(
184 http_addr: &str,
185 req: Request<Body>,
186) -> Result<Response, (StatusCode, String)> {
187 let uri = req.uri().clone();
188 let stream = tokio::time::timeout(CONNECT_TIMEOUT, Stream::connect(http_addr))
190 .await
191 .map_err(|_| {
192 (
193 StatusCode::GATEWAY_TIMEOUT,
194 format!("Connection timeout to {http_addr} after {CONNECT_TIMEOUT:?}"),
195 )
196 })?
197 .map_err(|e| {
198 (
199 StatusCode::BAD_GATEWAY,
200 format!("Failed to connect to {http_addr}: {e}"),
201 )
202 })?;
203
204 let io = TokioIo::new(stream);
206 let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
207 .await
208 .map_err(|e| {
209 (
210 StatusCode::BAD_GATEWAY,
211 format!("HTTP handshake with {http_addr} failed: {e}"),
212 )
213 })?;
214
215 let http_addr_owned = http_addr.to_owned();
217 mz_ore::task::spawn(|| format!("Proxy to {uri}"), async move {
218 if let Err(e) = conn.await {
219 tracing::debug!("Connection to clusterd {http_addr_owned} closed: {e}");
220 }
221 });
222
223 tokio::time::timeout(REQUEST_TIMEOUT, sender.send_request(req))
225 .await
226 .map_err(|_| {
227 (
228 StatusCode::GATEWAY_TIMEOUT,
229 format!("Request timeout to clusterd {http_addr} after {REQUEST_TIMEOUT:?}"),
230 )
231 })?
232 .map(|r| r.into_response())
233 .map_err(|e| {
234 (
235 StatusCode::BAD_GATEWAY,
236 format!("Error proxying to clusterd {http_addr}: {e}"),
237 )
238 })
239}
240
241pub struct ReplicaInfo {
243 pub cluster_name: String,
244 pub replica_name: String,
245 pub cluster_id: String,
246 pub replica_id: String,
247 pub process_count: usize,
248 pub process_indices: Vec<usize>,
249}
250
251#[derive(Template)]
252#[template(path = "clusters.html")]
253struct ClustersTemplate<'a> {
254 version: &'a str,
255 replicas: Vec<ReplicaInfo>,
256}
257
258pub(crate) async fn handle_clusters(client: AuthedClient) -> impl IntoResponse {
262 let catalog = client.client.catalog_snapshot("clusters_page").await;
264
265 let mut replicas = Vec::new();
266 for cluster in catalog.clusters() {
267 for replica in cluster.replicas() {
268 let process_count = replica.config.location.num_processes();
269 replicas.push(ReplicaInfo {
270 cluster_name: cluster.name.clone(),
271 replica_name: replica.name.clone(),
272 cluster_id: cluster.id.to_string(),
273 replica_id: replica.replica_id.to_string(),
274 process_count,
275 process_indices: (0..process_count).collect(),
276 });
277 }
278 }
279
280 let _ = catalog;
281
282 replicas.sort_by(|a, b| {
284 let a_is_system = a.cluster_id.starts_with('s');
285 let b_is_system = b.cluster_id.starts_with('s');
286 b_is_system
287 .cmp(&a_is_system)
288 .then_with(|| a.cluster_name.cmp(&b.cluster_name))
289 .then_with(|| a.replica_name.cmp(&b.replica_name))
290 });
291
292 mz_http_util::template_response(ClustersTemplate {
293 version: crate::BUILD_INFO.version,
294 replicas,
295 })
296}