Skip to main content

mz_environmentd/http/
cluster.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! HTTP proxy for cluster replica endpoints.
11//!
12//! This module provides an HTTP proxy that forwards requests from environmentd
13//! to clusterd internal HTTP endpoints (profiling, metrics, tracing). This allows
14//! accessing clusterd endpoints through environmentd's canonical HTTP port without
15//! requiring direct network access to the clusterd pods.
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use askama::Template;
21use axum::Extension;
22use axum::body::Body;
23use axum::extract::Path;
24use axum::http::{Request, StatusCode};
25use axum::response::{IntoResponse, Response};
26use http::HeaderValue;
27use hyper::Uri;
28use hyper_util::rt::TokioIo;
29use mz_controller::ReplicaHttpLocator;
30use mz_controller_types::{ClusterId, ReplicaId};
31use mz_ore::netio::{SocketAddrType, Stream};
32
33use crate::http::AuthedClient;
34
35/// Connection timeout for proxied requests.
36const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
37/// Overall request timeout for proxied requests.
38const REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
39
40/// Configuration for the cluster HTTP proxy.
41pub struct ClusterProxyConfig {
42    /// Handle to look up replica HTTP addresses.
43    pub(crate) locator: Arc<ReplicaHttpLocator>,
44}
45
46impl ClusterProxyConfig {
47    /// Creates a new `ClusterProxyConfig`.
48    pub fn new(locator: Arc<ReplicaHttpLocator>) -> Self {
49        Self { locator }
50    }
51}
52
53/// Proxy handler for cluster replica HTTP endpoints (root path).
54///
55/// Route: `/api/cluster/:cluster_id/replica/:replica_id/process/:process/`
56///
57/// This handler handles requests to the root of a clusterd process's HTTP endpoint.
58pub(crate) async fn handle_cluster_proxy_root(
59    Path((cluster_id, replica_id, process)): Path<(String, String, usize)>,
60    config: Extension<Arc<ClusterProxyConfig>>,
61    req: Request<Body>,
62) -> Result<Response, (StatusCode, String)> {
63    handle_cluster_proxy_inner(cluster_id, replica_id, process, "", config, req).await
64}
65
66/// Proxy handler for cluster replica HTTP endpoints.
67///
68/// Route: `/api/cluster/:cluster_id/replica/:replica_id/process/:process/*path`
69///
70/// This handler proxies HTTP requests to the internal HTTP endpoint of a specific
71/// clusterd process. Each replica can have multiple processes (based on `scale`),
72/// and each process has its own HTTP endpoint.
73pub(crate) async fn handle_cluster_proxy(
74    Path((cluster_id, replica_id, process, path)): Path<(String, String, usize, String)>,
75    config: Extension<Arc<ClusterProxyConfig>>,
76    req: Request<Body>,
77) -> Result<Response, (StatusCode, String)> {
78    handle_cluster_proxy_inner(cluster_id, replica_id, process, &path, config, req).await
79}
80
81async fn handle_cluster_proxy_inner(
82    cluster_id: String,
83    replica_id: String,
84    process: usize,
85    path: &str,
86    config: Extension<Arc<ClusterProxyConfig>>,
87    mut req: Request<Body>,
88) -> Result<Response, (StatusCode, String)> {
89    // Parse cluster ID
90    let cluster_id: ClusterId = cluster_id.parse().map_err(|e| {
91        (
92            StatusCode::BAD_REQUEST,
93            format!("Invalid cluster_id '{cluster_id}': {e}"),
94        )
95    })?;
96
97    // Parse replica ID
98    let replica_id: ReplicaId = replica_id.parse().map_err(|e| {
99        (
100            StatusCode::BAD_REQUEST,
101            format!("Invalid replica_id '{replica_id}': {e}"),
102        )
103    })?;
104
105    // Look up HTTP address for this replica and process
106    let http_addr = config
107        .locator
108        .get_http_addr(cluster_id, replica_id, process)
109        .ok_or_else(|| {
110            (
111                StatusCode::NOT_FOUND,
112                format!(
113                    "No HTTP address found for cluster {cluster_id}, replica {replica_id}, process {process}"
114                ),
115            )
116        })?;
117
118    // Build target URI, preserving query string if present
119    let path_query = if let Some(query) = req.uri().query() {
120        format!("/{path}?{query}")
121    } else {
122        format!("/{path}")
123    };
124
125    rewrite_request_for_replica(
126        &mut req,
127        &http_addr,
128        cluster_id,
129        replica_id,
130        process,
131        &path_query,
132    )?;
133
134    proxy_replica_request(&http_addr, req).await
135}
136
137/// Rewrites `req` to target a clusterd HTTP endpoint at `http_addr`.
138pub(crate) fn rewrite_request_for_replica(
139    req: &mut Request<Body>,
140    http_addr: &str,
141    cluster_id: ClusterId,
142    replica_id: ReplicaId,
143    process: usize,
144    path_query: &str,
145) -> Result<(), (StatusCode, String)> {
146    let authority = match SocketAddrType::guess(http_addr) {
147        // UDS addresses aren't valid URI authorities, use a placeholder.
148        SocketAddrType::Unix => format!("cluster_{cluster_id}_replica_{replica_id}_{process}"),
149        SocketAddrType::Turmoil => http_addr.trim_start_matches("turmoil:").to_owned(),
150        SocketAddrType::Inet => http_addr.to_owned(),
151    };
152    let uri = Uri::try_from(format!("http://{authority}{path_query}")).map_err(|e| {
153        (
154            StatusCode::INTERNAL_SERVER_ERROR,
155            format!("Invalid URI 'http://{authority}{path_query}': {e}"),
156        )
157    })?;
158
159    // Update request with new URI
160    *req.uri_mut() = uri.clone();
161
162    // Set Host header to target
163    if let Some(host) = uri.host() {
164        req.headers_mut().insert(
165            http::header::HOST,
166            HeaderValue::from_str(host).map_err(|e| {
167                (
168                    StatusCode::INTERNAL_SERVER_ERROR,
169                    format!("Invalid host header '{host}': {e}"),
170                )
171            })?,
172        );
173    }
174
175    // Signal that this is a one-shot connection so the server shuts down
176    // gracefully instead of waiting for more requests.
177    req.headers_mut()
178        .insert(http::header::CONNECTION, HeaderValue::from_static("close"));
179
180    Ok(())
181}
182
183pub(crate) async fn proxy_replica_request(
184    http_addr: &str,
185    req: Request<Body>,
186) -> Result<Response, (StatusCode, String)> {
187    let uri = req.uri().clone();
188    // Connect to the target with timeout
189    let stream = tokio::time::timeout(CONNECT_TIMEOUT, Stream::connect(http_addr))
190        .await
191        .map_err(|_| {
192            (
193                StatusCode::GATEWAY_TIMEOUT,
194                format!("Connection timeout to {http_addr} after {CONNECT_TIMEOUT:?}"),
195            )
196        })?
197        .map_err(|e| {
198            (
199                StatusCode::BAD_GATEWAY,
200                format!("Failed to connect to {http_addr}: {e}"),
201            )
202        })?;
203
204    // Perform HTTP/1.1 handshake
205    let io = TokioIo::new(stream);
206    let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
207        .await
208        .map_err(|e| {
209            (
210                StatusCode::BAD_GATEWAY,
211                format!("HTTP handshake with {http_addr} failed: {e}"),
212            )
213        })?;
214
215    // Spawn task to drive the connection
216    let http_addr_owned = http_addr.to_owned();
217    mz_ore::task::spawn(|| format!("Proxy to {uri}"), async move {
218        if let Err(e) = conn.await {
219            tracing::debug!("Connection to clusterd {http_addr_owned} closed: {e}");
220        }
221    });
222
223    // Send the request with timeout
224    tokio::time::timeout(REQUEST_TIMEOUT, sender.send_request(req))
225        .await
226        .map_err(|_| {
227            (
228                StatusCode::GATEWAY_TIMEOUT,
229                format!("Request timeout to clusterd {http_addr} after {REQUEST_TIMEOUT:?}"),
230            )
231        })?
232        .map(|r| r.into_response())
233        .map_err(|e| {
234            (
235                StatusCode::BAD_GATEWAY,
236                format!("Error proxying to clusterd {http_addr}: {e}"),
237            )
238        })
239}
240
241/// Information about a replica for the clusters page template.
242pub struct ReplicaInfo {
243    pub cluster_name: String,
244    pub replica_name: String,
245    pub cluster_id: String,
246    pub replica_id: String,
247    pub process_count: usize,
248    pub process_indices: Vec<usize>,
249}
250
251#[derive(Template)]
252#[template(path = "clusters.html")]
253struct ClustersTemplate<'a> {
254    version: &'a str,
255    replicas: Vec<ReplicaInfo>,
256}
257
258/// Handler for the clusters overview page.
259///
260/// Displays a table of all cluster replicas with links to their HTTP endpoints.
261pub(crate) async fn handle_clusters(client: AuthedClient) -> impl IntoResponse {
262    // Look up names from the catalog
263    let catalog = client.client.catalog_snapshot("clusters_page").await;
264
265    let mut replicas = Vec::new();
266    for cluster in catalog.clusters() {
267        for replica in cluster.replicas() {
268            let process_count = replica.config.location.num_processes();
269            replicas.push(ReplicaInfo {
270                cluster_name: cluster.name.clone(),
271                replica_name: replica.name.clone(),
272                cluster_id: cluster.id.to_string(),
273                replica_id: replica.replica_id.to_string(),
274                process_count,
275                process_indices: (0..process_count).collect(),
276            });
277        }
278    }
279
280    let _ = catalog;
281
282    // Sort by system clusters first (cluster_id starts with 's'), then cluster name, then replica name
283    replicas.sort_by(|a, b| {
284        let a_is_system = a.cluster_id.starts_with('s');
285        let b_is_system = b.cluster_id.starts_with('s');
286        b_is_system
287            .cmp(&a_is_system)
288            .then_with(|| a.cluster_name.cmp(&b.cluster_name))
289            .then_with(|| a.replica_name.cmp(&b.replica_name))
290    });
291
292    mz_http_util::template_response(ClustersTemplate {
293        version: crate::BUILD_INFO.version,
294        replicas,
295    })
296}