1use anyhow::{Context as AnyhowContext, Result};
12use futures::StreamExt;
13use k8s_openapi::api::core::v1::ServicePort;
14use reqwest::header::{HeaderMap, HeaderValue};
15use std::path::Path;
16use std::sync::Arc;
17use tokio::fs::{File, create_dir_all};
18use tokio::io::AsyncWriteExt;
19use tracing::{info, warn};
20use url::Url;
21
22use crate::kubectl_port_forwarder::{
23 KubectlPortForwarder, find_cluster_services, find_environmentd_service,
24};
25use crate::{AuthMode, Context, EmulatorContext, PasswordAuthCredentials, SelfManagedContext};
26
27static HEAP_PROFILES_DIR: &str = "profiles";
28static PROM_METRICS_DIR: &str = "prom_metrics";
29static PROM_METRICS_ENDPOINT: &str = "metrics";
30static ENVD_HEAP_PROFILE_ENDPOINT: &str = "prof/heap";
31static CLUSTERD_HEAP_PROFILE_ENDPOINT: &str = "heap";
32static DEFAULT_EXTERNAL_HTTP_PORT: i32 = 6877;
34static DEFAULT_INTERNAL_HTTP_PORT: i32 = 6878;
36
37const INTERNAL_HTTP_PORT_LABEL: &str = "internal-http";
39const EXTERNAL_HTTP_PORT_LABEL: &str = "https";
42
43enum ServiceType {
44 Clusterd,
45 Environmentd,
46}
47
48fn get_profile_endpoint(service_type: &ServiceType) -> &'static str {
49 match service_type {
50 ServiceType::Clusterd => CLUSTERD_HEAP_PROFILE_ENDPOINT,
51 ServiceType::Environmentd => ENVD_HEAP_PROFILE_ENDPOINT,
52 }
53}
54
55#[derive(Debug, Clone)]
56struct HttpPortLabels {
57 heap_profile_port_label: &'static str,
58 prom_metrics_port_label: &'static str,
59}
60
61fn get_port_labels(auth_mode: &AuthMode, service_type: &ServiceType) -> HttpPortLabels {
62 match (auth_mode, service_type) {
63 (AuthMode::None, ServiceType::Clusterd)
64 | (AuthMode::None, ServiceType::Environmentd)
65 | (AuthMode::Password(_), ServiceType::Clusterd) => HttpPortLabels {
68 heap_profile_port_label: INTERNAL_HTTP_PORT_LABEL,
69 prom_metrics_port_label: INTERNAL_HTTP_PORT_LABEL,
70 },
71 (AuthMode::Password(_), ServiceType::Environmentd) => HttpPortLabels {
72 heap_profile_port_label: EXTERNAL_HTTP_PORT_LABEL,
73 prom_metrics_port_label: INTERNAL_HTTP_PORT_LABEL,
74 },
75 }
76}
77
78struct HttpDefaultPorts {
79 heap_profile_port: i32,
80 prom_metrics_port: i32,
81}
82
83fn get_default_port(auth_mode: &AuthMode) -> HttpDefaultPorts {
84 match auth_mode {
85 AuthMode::None => HttpDefaultPorts {
86 heap_profile_port: DEFAULT_INTERNAL_HTTP_PORT,
87 prom_metrics_port: DEFAULT_INTERNAL_HTTP_PORT,
88 },
89 AuthMode::Password(_) => HttpDefaultPorts {
90 heap_profile_port: DEFAULT_EXTERNAL_HTTP_PORT,
91 prom_metrics_port: DEFAULT_INTERNAL_HTTP_PORT,
92 },
93 }
94}
95
96pub struct HttpDumpClient<'n> {
98 context: &'n Context,
99 auth_mode: &'n AuthMode,
100 http_client: &'n reqwest::Client,
101}
102
103impl<'n> HttpDumpClient<'n> {
105 pub fn new(
106 context: &'n Context,
107 auth_mode: &'n AuthMode,
108 http_client: &'n reqwest::Client,
109 ) -> Self {
110 Self {
111 context,
112 auth_mode,
113 http_client,
114 }
115 }
116
117 async fn dump_request_to_file(
118 &self,
119 relative_url: &str,
120 headers: HeaderMap,
121 output_path: &Path,
122 ) -> Result<(), anyhow::Error> {
123 let create_request = |url: &Url| {
124 let mut request_builder = self
125 .http_client
126 .get(url.to_string())
127 .headers(headers.clone());
128
129 if let AuthMode::Password(PasswordAuthCredentials { username, password }) =
130 self.auth_mode
131 {
132 request_builder = request_builder.basic_auth(&username, Some(&password));
133 }
134
135 request_builder
136 };
137 let mut url = Url::parse(&format!("https://{}", relative_url))
139 .with_context(|| format!("Failed to parse URL: https://{}", relative_url))?;
140
141 let request = create_request(&url);
142
143 let mut response = request.send().await;
144
145 if response.is_err() {
146 let _ = url.set_scheme("http");
148
149 response = create_request(&url).send().await;
150 }
151
152 let response = response.with_context(|| format!("Failed to send request to {}", url))?;
153
154 if !response.status().is_success() {
155 return Err(anyhow::anyhow!(
156 "Failed to get response from {}: {}",
157 url,
158 response.status()
159 ));
160 }
161
162 let mut file = File::create(output_path)
163 .await
164 .with_context(|| format!("Failed to create file: {}", output_path.display()))?;
165
166 let mut stream = response.bytes_stream();
167 while let Some(chunk) = stream.next().await {
168 let chunk = chunk.with_context(|| "Failed to read chunk from response")?;
169 file.write_all(&chunk)
170 .await
171 .with_context(|| "Failed to write chunk to file")?;
172 }
173
174 file.flush()
175 .await
176 .with_context(|| "Failed to flush file contents")?;
177
178 Ok(())
179 }
180
181 pub async fn dump_heap_profile(&self, relative_url: &str, service_name: &str) -> Result<()> {
183 let output_dir = self.context.base_path.join(HEAP_PROFILES_DIR);
184 create_dir_all(&output_dir).await.with_context(|| {
185 format!(
186 "Failed to create output directory: {}",
187 output_dir.display()
188 )
189 })?;
190 let output_path = output_dir.join(format!("{}.memprof.pprof.gz", service_name));
191
192 self.dump_request_to_file(
193 relative_url,
194 {
195 let mut headers = HeaderMap::new();
196 headers.insert(
197 "Accept",
198 HeaderValue::from_static("application/octet-stream"),
199 );
200 headers
201 },
202 &output_path,
203 )
204 .await
205 .with_context(|| format!("Failed to dump heap profile to {}", output_path.display()))?;
206
207 Ok(())
208 }
209
210 pub async fn dump_prometheus_metrics(
211 &self,
212 relative_url: &str,
213 service_name: &str,
214 ) -> Result<()> {
215 let output_dir = self.context.base_path.join(PROM_METRICS_DIR);
216 create_dir_all(&output_dir).await.with_context(|| {
217 format!(
218 "Failed to create output directory: {}",
219 output_dir.display()
220 )
221 })?;
222
223 let output_path = output_dir.join(format!("{}.metrics.txt", service_name));
224 self.dump_request_to_file(
225 relative_url,
226 {
227 let mut headers = HeaderMap::new();
228 headers.insert("Accept", HeaderValue::from_static("text/plain"));
229 headers
230 },
231 &output_path,
232 )
233 .await?;
234
235 Ok(())
236 }
237}
238
239pub async fn dump_emulator_http_resources(
241 context: &Context,
242 emulator_context: &EmulatorContext,
243) -> Result<()> {
244 let http_client = reqwest::Client::new();
245 let dump_task = HttpDumpClient::new(
246 context,
247 &emulator_context.http_connection_auth_mode,
248 &http_client,
249 );
250
251 if context.dump_heap_profiles {
252 let resource_name = "environmentd".to_string();
253
254 if let Err(e) = dump_task
256 .dump_heap_profile(
257 &format!(
258 "{}:{}/{}",
259 emulator_context.container_ip,
260 get_default_port(&emulator_context.http_connection_auth_mode).heap_profile_port,
261 ENVD_HEAP_PROFILE_ENDPOINT
262 ),
263 &resource_name,
264 )
265 .await
266 {
267 warn!("Failed to dump heap profile: {:#}", e);
268 }
269 }
270
271 if context.dump_prometheus_metrics {
272 let resource_name = "environmentd".to_string();
273
274 if let Err(e) = dump_task
275 .dump_prometheus_metrics(
276 &format!(
277 "{}:{}/{}",
278 emulator_context.container_ip,
279 get_default_port(&emulator_context.http_connection_auth_mode).prom_metrics_port,
280 PROM_METRICS_ENDPOINT
281 ),
282 &resource_name,
283 )
284 .await
285 {
286 warn!("Failed to dump prometheus metrics: {:#}", e);
287 }
288 }
289
290 Ok(())
291}
292
293pub async fn dump_self_managed_http_resources(
294 context: &Context,
295 self_managed_context: &SelfManagedContext,
296) -> Result<()> {
297 let http_client = reqwest::Client::new();
298 let dump_task = HttpDumpClient::new(
299 context,
300 &self_managed_context.http_connection_auth_mode,
301 &http_client,
302 );
303
304 let cluster_services = find_cluster_services(
305 &self_managed_context.k8s_client,
306 &self_managed_context.k8s_namespace,
307 &self_managed_context.mz_instance_name,
308 )
309 .await
310 .with_context(|| "Failed to find cluster services")?;
311
312 let environmentd_service = find_environmentd_service(
313 &self_managed_context.k8s_client,
314 &self_managed_context.k8s_namespace,
315 &self_managed_context.mz_instance_name,
316 )
317 .await
318 .with_context(|| "Failed to find environmentd service")?;
319
320 let services = cluster_services
321 .iter()
322 .map(|service| (service, ServiceType::Clusterd))
323 .chain(std::iter::once((
324 &environmentd_service,
325 ServiceType::Environmentd,
326 )));
327
328 for (service_info, service_type) in services {
330 let profiling_endpoint = get_profile_endpoint(&service_type);
331 let heap_profile_port_label = get_port_labels(
332 &self_managed_context.http_connection_auth_mode,
333 &service_type,
334 )
335 .heap_profile_port_label;
336
337 let prom_metrics_port_label = get_port_labels(
338 &self_managed_context.http_connection_auth_mode,
339 &service_type,
340 )
341 .prom_metrics_port_label;
342
343 let (heap_profile_http_connection, prom_metrics_http_connection) = {
344 let maybe_heap_profile_port = service_info
345 .service_ports
346 .iter()
347 .find_map(|port_info| find_http_port_by_label(port_info, heap_profile_port_label));
348 let maybe_prom_metrics_port = service_info
349 .service_ports
350 .iter()
351 .find_map(|port_info| find_http_port_by_label(port_info, prom_metrics_port_label));
352 if let (Some(heap_profile_port), Some(prom_metrics_port)) =
353 (maybe_heap_profile_port, maybe_prom_metrics_port)
354 {
355 let heap_profile_port_forwarder = KubectlPortForwarder {
356 context: self_managed_context.k8s_context.clone(),
357 namespace: service_info.namespace.clone(),
358 service_name: service_info.service_name.clone(),
359 target_port: heap_profile_port.port,
360 };
361 let heap_profile_http_connection = Arc::new(
362 heap_profile_port_forwarder
363 .spawn_port_forward()
364 .await
365 .with_context(|| {
366 format!(
367 "Failed to spawn port forwarder for service {}",
368 service_info.service_name
369 )
370 })?,
371 );
372 let prom_metrics_http_connection = if heap_profile_port == prom_metrics_port {
373 Arc::clone(&heap_profile_http_connection)
374 } else {
375 let prom_metrics_port_forwarder = KubectlPortForwarder {
376 context: self_managed_context.k8s_context.clone(),
377 namespace: service_info.namespace.clone(),
378 service_name: service_info.service_name.clone(),
379 target_port: prom_metrics_port.port,
380 };
381 Arc::new(
382 prom_metrics_port_forwarder
383 .spawn_port_forward()
384 .await
385 .with_context(|| {
386 format!(
387 "Failed to spawn port forwarder for service {}",
388 service_info.service_name
389 )
390 })?,
391 )
392 };
393
394 (heap_profile_http_connection, prom_metrics_http_connection)
395 } else {
396 return Err(anyhow::anyhow!(
397 "Failed to find HTTP port for service {}, heap_profile_port_label={}, prom_metrics_port_label={}",
398 service_info.service_name,
399 heap_profile_port_label,
400 prom_metrics_port_label
401 ));
402 }
403 };
404
405 if context.dump_heap_profiles {
406 let profiling_endpoint = format!(
407 "{}:{}/{}",
408 heap_profile_http_connection.local_address,
409 heap_profile_http_connection.local_port,
410 profiling_endpoint
411 );
412
413 info!(
414 "Dumping heap profile for service {}",
415 service_info.service_name
416 );
417 if let Err(e) = dump_task
418 .dump_heap_profile(&profiling_endpoint, &service_info.service_name)
419 .await
420 {
421 warn!(
422 "Failed to dump heap profile for service {}: {:#}",
423 service_info.service_name, e
424 );
425 }
426 }
427
428 if context.dump_prometheus_metrics {
429 let prom_metrics_endpoint = format!(
430 "{}:{}/{}",
431 prom_metrics_http_connection.local_address,
432 prom_metrics_http_connection.local_port,
433 PROM_METRICS_ENDPOINT
434 );
435 info!(
436 "Dumping prometheus metrics for service {}",
437 service_info.service_name
438 );
439 if let Err(e) = dump_task
440 .dump_prometheus_metrics(&prom_metrics_endpoint, &service_info.service_name)
441 .await
442 {
443 warn!(
444 "Failed to dump prometheus metrics for service {}: {:#}",
445 service_info.service_name, e
446 );
447 }
448 }
449 }
450
451 Ok(())
452}
453
454fn find_http_port_by_label<'a>(
455 port_info: &'a ServicePort,
456 target_port_label: &'static str,
457) -> Option<&'a ServicePort> {
458 if let Some(port_name) = &port_info.name {
459 let port_name = port_name.to_lowercase();
460 if port_name == target_port_label {
461 return Some(port_info);
462 }
463 }
464 None
465}