mz_debug/
internal_http_dumper.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Dumps internal http debug information to files.
11use anyhow::{Context as AnyhowContext, Result};
12use futures::StreamExt;
13use k8s_openapi::api::core::v1::ServicePort;
14use reqwest::header::{HeaderMap, HeaderValue};
15use std::path::Path;
16use std::sync::Arc;
17use tokio::fs::{File, create_dir_all};
18use tokio::io::AsyncWriteExt;
19use tracing::{info, warn};
20use url::Url;
21
22use crate::kubectl_port_forwarder::{
23    KubectlPortForwarder, find_cluster_services, find_environmentd_service,
24};
25use crate::{AuthMode, Context, EmulatorContext, PasswordAuthCredentials, SelfManagedContext};
26
27static HEAP_PROFILES_DIR: &str = "profiles";
28static PROM_METRICS_DIR: &str = "prom_metrics";
29static PROM_METRICS_ENDPOINT: &str = "metrics";
30static ENVD_HEAP_PROFILE_ENDPOINT: &str = "prof/heap";
31static CLUSTERD_HEAP_PROFILE_ENDPOINT: &str = "heap";
32/// The default port for the external HTTP endpoint.
33static DEFAULT_EXTERNAL_HTTP_PORT: i32 = 6877;
34/// The default port for the internal HTTP endpoint.
35static DEFAULT_INTERNAL_HTTP_PORT: i32 = 6878;
36
37/// The label for the internal HTTP port.
38const INTERNAL_HTTP_PORT_LABEL: &str = "internal-http";
39/// The label for the external HTTP port.
40// Even when not using TLS, the external HTTP port is labeled as "https".
41const EXTERNAL_HTTP_PORT_LABEL: &str = "https";
42
43enum ServiceType {
44    Clusterd,
45    Environmentd,
46}
47
48fn get_profile_endpoint(service_type: &ServiceType) -> &'static str {
49    match service_type {
50        ServiceType::Clusterd => CLUSTERD_HEAP_PROFILE_ENDPOINT,
51        ServiceType::Environmentd => ENVD_HEAP_PROFILE_ENDPOINT,
52    }
53}
54
55#[derive(Debug, Clone)]
56struct HttpPortLabels {
57    heap_profile_port_label: &'static str,
58    prom_metrics_port_label: &'static str,
59}
60
61fn get_port_labels(auth_mode: &AuthMode, service_type: &ServiceType) -> HttpPortLabels {
62    match (auth_mode, service_type) {
63        (AuthMode::None, ServiceType::Clusterd)
64        | (AuthMode::None, ServiceType::Environmentd)
65        // Even if in the password listener config, the heap profile port is specified as external, clusterd will
66        // still use the internal port.
67        | (AuthMode::Password(_), ServiceType::Clusterd) => HttpPortLabels {
68            heap_profile_port_label: INTERNAL_HTTP_PORT_LABEL,
69            prom_metrics_port_label: INTERNAL_HTTP_PORT_LABEL,
70        },
71        (AuthMode::Password(_), ServiceType::Environmentd) => HttpPortLabels {
72            heap_profile_port_label: EXTERNAL_HTTP_PORT_LABEL,
73            prom_metrics_port_label: INTERNAL_HTTP_PORT_LABEL,
74        },
75    }
76}
77
78struct HttpDefaultPorts {
79    heap_profile_port: i32,
80    prom_metrics_port: i32,
81}
82
83fn get_default_port(auth_mode: &AuthMode) -> HttpDefaultPorts {
84    match auth_mode {
85        AuthMode::None => HttpDefaultPorts {
86            heap_profile_port: DEFAULT_INTERNAL_HTTP_PORT,
87            prom_metrics_port: DEFAULT_INTERNAL_HTTP_PORT,
88        },
89        AuthMode::Password(_) => HttpDefaultPorts {
90            heap_profile_port: DEFAULT_EXTERNAL_HTTP_PORT,
91            prom_metrics_port: DEFAULT_INTERNAL_HTTP_PORT,
92        },
93    }
94}
95
96/// A struct that handles downloading and saving profile data from HTTP endpoints.
97pub struct HttpDumpClient<'n> {
98    context: &'n Context,
99    auth_mode: &'n AuthMode,
100    http_client: &'n reqwest::Client,
101}
102
103/// A struct that handles downloading and exporting data from our internal HTTP endpoints.
104impl<'n> HttpDumpClient<'n> {
105    pub fn new(
106        context: &'n Context,
107        auth_mode: &'n AuthMode,
108        http_client: &'n reqwest::Client,
109    ) -> Self {
110        Self {
111            context,
112            auth_mode,
113            http_client,
114        }
115    }
116
117    async fn dump_request_to_file(
118        &self,
119        relative_url: &str,
120        headers: HeaderMap,
121        output_path: &Path,
122    ) -> Result<(), anyhow::Error> {
123        let create_request = |url: &Url| {
124            let mut request_builder = self
125                .http_client
126                .get(url.to_string())
127                .headers(headers.clone());
128
129            if let AuthMode::Password(PasswordAuthCredentials { username, password }) =
130                self.auth_mode
131            {
132                request_builder = request_builder.basic_auth(&username, Some(&password));
133            }
134
135            request_builder
136        };
137        // Try HTTPS first, then fall back to HTTP if that fails
138        let mut url = Url::parse(&format!("https://{}", relative_url))
139            .with_context(|| format!("Failed to parse URL: https://{}", relative_url))?;
140
141        let request = create_request(&url);
142
143        let mut response = request.send().await;
144
145        if response.is_err() {
146            // Fall back to HTTP if HTTPS fails
147            let _ = url.set_scheme("http");
148
149            response = create_request(&url).send().await;
150        }
151
152        let response = response.with_context(|| format!("Failed to send request to {}", url))?;
153
154        if !response.status().is_success() {
155            return Err(anyhow::anyhow!(
156                "Failed to get response from {}: {}",
157                url,
158                response.status()
159            ));
160        }
161
162        let mut file = File::create(output_path)
163            .await
164            .with_context(|| format!("Failed to create file: {}", output_path.display()))?;
165
166        let mut stream = response.bytes_stream();
167        while let Some(chunk) = stream.next().await {
168            let chunk = chunk.with_context(|| "Failed to read chunk from response")?;
169            file.write_all(&chunk)
170                .await
171                .with_context(|| "Failed to write chunk to file")?;
172        }
173
174        file.flush()
175            .await
176            .with_context(|| "Failed to flush file contents")?;
177
178        Ok(())
179    }
180
181    /// Downloads and saves heap profile data
182    pub async fn dump_heap_profile(&self, relative_url: &str, service_name: &str) -> Result<()> {
183        let output_dir = self.context.base_path.join(HEAP_PROFILES_DIR);
184        create_dir_all(&output_dir).await.with_context(|| {
185            format!(
186                "Failed to create output directory: {}",
187                output_dir.display()
188            )
189        })?;
190        let output_path = output_dir.join(format!("{}.memprof.pprof.gz", service_name));
191
192        self.dump_request_to_file(
193            relative_url,
194            {
195                let mut headers = HeaderMap::new();
196                headers.insert(
197                    "Accept",
198                    HeaderValue::from_static("application/octet-stream"),
199                );
200                headers
201            },
202            &output_path,
203        )
204        .await
205        .with_context(|| format!("Failed to dump heap profile to {}", output_path.display()))?;
206
207        Ok(())
208    }
209
210    pub async fn dump_prometheus_metrics(
211        &self,
212        relative_url: &str,
213        service_name: &str,
214    ) -> Result<()> {
215        let output_dir = self.context.base_path.join(PROM_METRICS_DIR);
216        create_dir_all(&output_dir).await.with_context(|| {
217            format!(
218                "Failed to create output directory: {}",
219                output_dir.display()
220            )
221        })?;
222
223        let output_path = output_dir.join(format!("{}.metrics.txt", service_name));
224        self.dump_request_to_file(
225            relative_url,
226            {
227                let mut headers = HeaderMap::new();
228                headers.insert("Accept", HeaderValue::from_static("text/plain"));
229                headers
230            },
231            &output_path,
232        )
233        .await?;
234
235        Ok(())
236    }
237}
238
239// TODO (debug_tool3): Scrape cluster profiles through a proxy when (database-issues#7049) is implemented
240pub async fn dump_emulator_http_resources(
241    context: &Context,
242    emulator_context: &EmulatorContext,
243) -> Result<()> {
244    let http_client = reqwest::Client::new();
245    let dump_task = HttpDumpClient::new(
246        context,
247        &emulator_context.http_connection_auth_mode,
248        &http_client,
249    );
250
251    if context.dump_heap_profiles {
252        let resource_name = "environmentd".to_string();
253
254        // We assume the emulator is exposed on the local network and uses port 6878.
255        if let Err(e) = dump_task
256            .dump_heap_profile(
257                &format!(
258                    "{}:{}/{}",
259                    emulator_context.container_ip,
260                    get_default_port(&emulator_context.http_connection_auth_mode).heap_profile_port,
261                    ENVD_HEAP_PROFILE_ENDPOINT
262                ),
263                &resource_name,
264            )
265            .await
266        {
267            warn!("Failed to dump heap profile: {:#}", e);
268        }
269    }
270
271    if context.dump_prometheus_metrics {
272        let resource_name = "environmentd".to_string();
273
274        if let Err(e) = dump_task
275            .dump_prometheus_metrics(
276                &format!(
277                    "{}:{}/{}",
278                    emulator_context.container_ip,
279                    get_default_port(&emulator_context.http_connection_auth_mode).prom_metrics_port,
280                    PROM_METRICS_ENDPOINT
281                ),
282                &resource_name,
283            )
284            .await
285        {
286            warn!("Failed to dump prometheus metrics: {:#}", e);
287        }
288    }
289
290    Ok(())
291}
292
293pub async fn dump_self_managed_http_resources(
294    context: &Context,
295    self_managed_context: &SelfManagedContext,
296) -> Result<()> {
297    let http_client = reqwest::Client::new();
298    let dump_task = HttpDumpClient::new(
299        context,
300        &self_managed_context.http_connection_auth_mode,
301        &http_client,
302    );
303
304    let cluster_services = find_cluster_services(
305        &self_managed_context.k8s_client,
306        &self_managed_context.k8s_namespace,
307        &self_managed_context.mz_instance_name,
308    )
309    .await
310    .with_context(|| "Failed to find cluster services")?;
311
312    let environmentd_service = find_environmentd_service(
313        &self_managed_context.k8s_client,
314        &self_managed_context.k8s_namespace,
315        &self_managed_context.mz_instance_name,
316    )
317    .await
318    .with_context(|| "Failed to find environmentd service")?;
319
320    let services = cluster_services
321        .iter()
322        .map(|service| (service, ServiceType::Clusterd))
323        .chain(std::iter::once((
324            &environmentd_service,
325            ServiceType::Environmentd,
326        )));
327
328    // Scrape each service
329    for (service_info, service_type) in services {
330        let profiling_endpoint = get_profile_endpoint(&service_type);
331        let heap_profile_port_label = get_port_labels(
332            &self_managed_context.http_connection_auth_mode,
333            &service_type,
334        )
335        .heap_profile_port_label;
336
337        let prom_metrics_port_label = get_port_labels(
338            &self_managed_context.http_connection_auth_mode,
339            &service_type,
340        )
341        .prom_metrics_port_label;
342
343        let (heap_profile_http_connection, prom_metrics_http_connection) = {
344            let maybe_heap_profile_port = service_info
345                .service_ports
346                .iter()
347                .find_map(|port_info| find_http_port_by_label(port_info, heap_profile_port_label));
348            let maybe_prom_metrics_port = service_info
349                .service_ports
350                .iter()
351                .find_map(|port_info| find_http_port_by_label(port_info, prom_metrics_port_label));
352            if let (Some(heap_profile_port), Some(prom_metrics_port)) =
353                (maybe_heap_profile_port, maybe_prom_metrics_port)
354            {
355                let heap_profile_port_forwarder = KubectlPortForwarder {
356                    context: self_managed_context.k8s_context.clone(),
357                    namespace: service_info.namespace.clone(),
358                    service_name: service_info.service_name.clone(),
359                    target_port: heap_profile_port.port,
360                };
361                let heap_profile_http_connection = Arc::new(
362                    heap_profile_port_forwarder
363                        .spawn_port_forward()
364                        .await
365                        .with_context(|| {
366                            format!(
367                                "Failed to spawn port forwarder for service {}",
368                                service_info.service_name
369                            )
370                        })?,
371                );
372                let prom_metrics_http_connection = if heap_profile_port == prom_metrics_port {
373                    Arc::clone(&heap_profile_http_connection)
374                } else {
375                    let prom_metrics_port_forwarder = KubectlPortForwarder {
376                        context: self_managed_context.k8s_context.clone(),
377                        namespace: service_info.namespace.clone(),
378                        service_name: service_info.service_name.clone(),
379                        target_port: prom_metrics_port.port,
380                    };
381                    Arc::new(
382                        prom_metrics_port_forwarder
383                            .spawn_port_forward()
384                            .await
385                            .with_context(|| {
386                                format!(
387                                    "Failed to spawn port forwarder for service {}",
388                                    service_info.service_name
389                                )
390                            })?,
391                    )
392                };
393
394                (heap_profile_http_connection, prom_metrics_http_connection)
395            } else {
396                return Err(anyhow::anyhow!(
397                    "Failed to find HTTP port for service {}, heap_profile_port_label={}, prom_metrics_port_label={}",
398                    service_info.service_name,
399                    heap_profile_port_label,
400                    prom_metrics_port_label
401                ));
402            }
403        };
404
405        if context.dump_heap_profiles {
406            let profiling_endpoint = format!(
407                "{}:{}/{}",
408                heap_profile_http_connection.local_address,
409                heap_profile_http_connection.local_port,
410                profiling_endpoint
411            );
412
413            info!(
414                "Dumping heap profile for service {}",
415                service_info.service_name
416            );
417            if let Err(e) = dump_task
418                .dump_heap_profile(&profiling_endpoint, &service_info.service_name)
419                .await
420            {
421                warn!(
422                    "Failed to dump heap profile for service {}: {:#}",
423                    service_info.service_name, e
424                );
425            }
426        }
427
428        if context.dump_prometheus_metrics {
429            let prom_metrics_endpoint = format!(
430                "{}:{}/{}",
431                prom_metrics_http_connection.local_address,
432                prom_metrics_http_connection.local_port,
433                PROM_METRICS_ENDPOINT
434            );
435            info!(
436                "Dumping prometheus metrics for service {}",
437                service_info.service_name
438            );
439            if let Err(e) = dump_task
440                .dump_prometheus_metrics(&prom_metrics_endpoint, &service_info.service_name)
441                .await
442            {
443                warn!(
444                    "Failed to dump prometheus metrics for service {}: {:#}",
445                    service_info.service_name, e
446                );
447            }
448        }
449    }
450
451    Ok(())
452}
453
454fn find_http_port_by_label<'a>(
455    port_info: &'a ServicePort,
456    target_port_label: &'static str,
457) -> Option<&'a ServicePort> {
458    if let Some(port_name) = &port_info.name {
459        let port_name = port_name.to_lowercase();
460        if port_name == target_port_label {
461            return Some(port_info);
462        }
463    }
464    None
465}