mz_prof_http/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Profiling HTTP endpoints.
11
12use std::env;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use askama::Template;
17use axum::response::IntoResponse;
18use axum::routing::{self, Router};
19use cfg_if::cfg_if;
20use http::StatusCode;
21use mz_build_info::BuildInfo;
22use mz_prof::StackProfileExt;
23use pprof_util::{ProfStartTime, StackProfile};
24
25cfg_if! {
26    if #[cfg(any(not(feature = "jemalloc"), miri))] {
27        use disabled::{handle_get, handle_post, handle_get_heap};
28    } else {
29        use enabled::{handle_get, handle_post, handle_get_heap};
30    }
31}
32
33static EXECUTABLE: LazyLock<String> = LazyLock::new(|| {
34    {
35        env::current_exe()
36            .ok()
37            .as_ref()
38            .and_then(|exe| exe.file_name())
39            .map(|exe| exe.to_string_lossy().into_owned())
40            .unwrap_or_else(|| "<unknown executable>".into())
41    }
42});
43
44mz_http_util::make_handle_static!(
45    dir_1: ::include_dir::include_dir!("$CARGO_MANIFEST_DIR/src/http/static"),
46    dir_2: ::include_dir::include_dir!("$OUT_DIR/src/http/static"),
47    prod_base_path: "src/http/static",
48    dev_base_path: "src/http/static-dev",
49);
50
51/// Creates a router that serves the profiling endpoints.
52pub fn router(build_info: &'static BuildInfo) -> Router {
53    Router::new()
54        .route(
55            "/",
56            routing::get(move |query, headers| handle_get(query, headers, build_info)),
57        )
58        .route(
59            "/",
60            routing::post(move |form| handle_post(form, build_info)),
61        )
62        .route("/heap", routing::get(handle_get_heap))
63        .route("/static/*path", routing::get(handle_static))
64}
65
66#[allow(dead_code)]
67enum MemProfilingStatus {
68    Disabled,
69    Enabled(Option<ProfStartTime>),
70}
71
72#[derive(Template)]
73#[template(path = "prof.html")]
74struct ProfTemplate<'a> {
75    version: &'a str,
76    executable: &'a str,
77    mem_prof: MemProfilingStatus,
78    ever_symbolized: bool,
79}
80
81#[derive(Template)]
82#[template(path = "flamegraph.html")]
83pub struct FlamegraphTemplate<'a> {
84    pub version: &'a str,
85    pub title: &'a str,
86    pub mzfg: &'a str,
87}
88
89#[allow(dropping_copy_types)]
90async fn time_prof(
91    merge_threads: bool,
92    build_info: &'static BuildInfo,
93    // the time in seconds to run the profiler for
94    time_secs: u64,
95    // the sampling frequency in Hz
96    sample_freq: u32,
97) -> impl IntoResponse + use<> {
98    let ctl_lock;
99    cfg_if! {
100        if #[cfg(any(not(feature = "jemalloc"), miri))] {
101            ctl_lock = ();
102        } else {
103            ctl_lock = if let Some(ctl) = jemalloc_pprof::PROF_CTL.as_ref() {
104                let mut borrow = ctl.lock().await;
105                borrow.deactivate().map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
106                Some(borrow)
107            } else {
108                None
109            };
110        }
111    }
112    // SAFETY: We ensure above that memory profiling is off.
113    // Since we hold the mutex, nobody else can be turning it back on in the intervening time.
114    let stacks = unsafe {
115        mz_prof::time::prof_time(Duration::from_secs(time_secs), sample_freq, merge_threads)
116    }
117    .await
118    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
119    // Fail with a compile error if we weren't holding the jemalloc lock.
120    drop(ctl_lock);
121    let (secs_s, freq_s) = (format!("{time_secs}"), format!("{sample_freq}"));
122    Ok::<_, (StatusCode, String)>(flamegraph(
123        stacks,
124        "CPU Time Flamegraph",
125        false,
126        &[
127            ("Sampling time (s)", &secs_s),
128            ("Sampling frequency (Hz)", &freq_s),
129        ],
130        build_info,
131    ))
132}
133
134fn flamegraph<'a, 'b>(
135    stacks: StackProfile,
136    title: &'a str,
137    display_bytes: bool,
138    extras: &'b [(&'b str, &'b str)],
139    build_info: &'static BuildInfo,
140) -> impl IntoResponse + use<'a> {
141    let mut header_extra = vec![];
142    if display_bytes {
143        header_extra.push(("display_bytes", "1"));
144    }
145    for (k, v) in extras {
146        header_extra.push((k, v));
147    }
148    let mzfg = stacks.to_mzfg(true, &header_extra);
149    mz_http_util::template_response(FlamegraphTemplate {
150        version: build_info.version,
151        title,
152        mzfg: &mzfg,
153    })
154}
155
156#[cfg(any(not(feature = "jemalloc"), miri))]
157mod disabled {
158    use axum::extract::{Form, Query};
159    use axum::response::IntoResponse;
160    use http::StatusCode;
161    use http::header::HeaderMap;
162    use mz_build_info::BuildInfo;
163    use serde::Deserialize;
164
165    use mz_prof::ever_symbolized;
166
167    use super::{MemProfilingStatus, ProfTemplate, time_prof};
168
169    #[derive(Deserialize)]
170    pub struct ProfQuery {
171        _action: Option<String>,
172    }
173
174    #[allow(clippy::unused_async)]
175    pub async fn handle_get(
176        _: Query<ProfQuery>,
177        _: HeaderMap,
178        build_info: &'static BuildInfo,
179    ) -> impl IntoResponse {
180        mz_http_util::template_response(ProfTemplate {
181            version: build_info.version,
182            executable: &super::EXECUTABLE,
183            mem_prof: MemProfilingStatus::Disabled,
184            ever_symbolized: ever_symbolized(),
185        })
186    }
187
188    #[derive(Deserialize)]
189    pub struct ProfForm {
190        action: String,
191        threads: Option<String>,
192        time_secs: Option<u64>,
193        hz: Option<u32>,
194    }
195
196    pub async fn handle_post(
197        Form(ProfForm {
198            action,
199            threads,
200            time_secs,
201            hz,
202        }): Form<ProfForm>,
203        build_info: &'static BuildInfo,
204    ) -> impl IntoResponse {
205        let merge_threads = threads.as_deref() == Some("merge");
206        match action.as_ref() {
207            "time_fg" => {
208                let time_secs = time_secs.ok_or_else(|| {
209                    (
210                        StatusCode::BAD_REQUEST,
211                        "Expected value for `time_secs`".to_owned(),
212                    )
213                })?;
214                let hz = hz.ok_or_else(|| {
215                    (
216                        StatusCode::BAD_REQUEST,
217                        "Expected value for `hz`".to_owned(),
218                    )
219                })?;
220
221                Ok(time_prof(merge_threads, build_info, time_secs, hz).await)
222            }
223            _ => Err((
224                StatusCode::BAD_REQUEST,
225                format!("unrecognized `action` parameter: {}", action),
226            )),
227        }
228    }
229
230    #[allow(clippy::unused_async)]
231    pub async fn handle_get_heap() -> Result<(), (StatusCode, String)> {
232        Err((
233            StatusCode::BAD_REQUEST,
234            "This software was compiled without heap profiling support.".to_string(),
235        ))
236    }
237}
238
239#[cfg(all(feature = "jemalloc", not(miri)))]
240mod enabled {
241    use std::io::{BufReader, Read};
242    use std::sync::Arc;
243
244    use axum::extract::{Form, Query};
245    use axum::response::IntoResponse;
246    use axum_extra::TypedHeader;
247    use bytesize::ByteSize;
248    use headers::ContentType;
249    use http::header::{CONTENT_DISPOSITION, HeaderMap};
250    use http::{HeaderValue, StatusCode};
251    use jemalloc_pprof::{JemallocProfCtl, PROF_CTL};
252    use mappings::MAPPINGS;
253    use mz_build_info::BuildInfo;
254    use mz_ore::cast::CastFrom;
255    use mz_prof::jemalloc::{JemallocProfCtlExt, JemallocStats};
256    use mz_prof::{StackProfileExt, ever_symbolized};
257    use pprof_util::parse_jeheap;
258    use serde::Deserialize;
259    use tokio::sync::Mutex;
260
261    use super::{MemProfilingStatus, ProfTemplate, flamegraph, time_prof};
262
263    #[derive(Deserialize)]
264    pub struct ProfForm {
265        action: String,
266        threads: Option<String>,
267        time_secs: Option<u64>,
268        hz: Option<u32>,
269    }
270
271    pub async fn handle_post(
272        Form(ProfForm {
273            action,
274            threads,
275            time_secs,
276            hz,
277        }): Form<ProfForm>,
278        build_info: &'static BuildInfo,
279    ) -> impl IntoResponse {
280        let prof_ctl = PROF_CTL.as_ref().unwrap();
281        let merge_threads = threads.as_deref() == Some("merge");
282
283        fn render_jemalloc_stats(stats: &JemallocStats) -> Vec<(&str, String)> {
284            let stats = [
285                ("Allocated", stats.allocated),
286                ("In active pages", stats.active),
287                ("Allocated for allocator metadata", stats.metadata),
288                (
289                    "Maximum number of bytes in physically resident data pages mapped by the allocator",
290                    stats.resident,
291                ),
292                ("Bytes unused, but retained by allocator", stats.retained),
293            ];
294            stats
295                .into_iter()
296                .map(|(k, v)| (k, ByteSize(u64::cast_from(v)).display().si().to_string()))
297                .collect()
298        }
299
300        match action.as_str() {
301            "activate" => {
302                {
303                    let mut borrow = prof_ctl.lock().await;
304                    borrow
305                        .activate()
306                        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
307                };
308                Ok(render_template(prof_ctl, build_info).await.into_response())
309            }
310            "deactivate" => {
311                {
312                    let mut borrow = prof_ctl.lock().await;
313                    borrow
314                        .deactivate()
315                        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
316                };
317                Ok(render_template(prof_ctl, build_info).await.into_response())
318            }
319            "dump_jeheap" => {
320                let mut borrow = prof_ctl.lock().await;
321                require_profiling_activated(&borrow)?;
322                let mut f = borrow
323                    .dump()
324                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
325                let mut s = String::new();
326                f.read_to_string(&mut s)
327                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
328                Ok((
329                    HeaderMap::from_iter([(
330                        CONTENT_DISPOSITION,
331                        HeaderValue::from_static("attachment; filename=\"jeprof.heap\""),
332                    )]),
333                    s,
334                )
335                    .into_response())
336            }
337            "dump_sym_mzfg" => {
338                let mut borrow = prof_ctl.lock().await;
339                require_profiling_activated(&borrow)?;
340                let f = borrow
341                    .dump()
342                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
343                let r = BufReader::new(f);
344                let stacks = parse_jeheap(r, MAPPINGS.as_deref())
345                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
346                let stats = borrow
347                    .stats()
348                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
349                let stats_rendered = render_jemalloc_stats(&stats);
350                let mut header = stats_rendered
351                    .iter()
352                    .map(|(k, v)| (*k, v.as_str()))
353                    .collect::<Vec<_>>();
354                header.push(("display_bytes", "1"));
355                let mzfg = stacks.to_mzfg(true, &header);
356                Ok((
357                    HeaderMap::from_iter([(
358                        CONTENT_DISPOSITION,
359                        HeaderValue::from_static("attachment; filename=\"trace.mzfg\""),
360                    )]),
361                    mzfg,
362                )
363                    .into_response())
364            }
365            "mem_fg" => {
366                let mut borrow = prof_ctl.lock().await;
367                require_profiling_activated(&borrow)?;
368                let f = borrow
369                    .dump()
370                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
371                let r = BufReader::new(f);
372                let stacks = parse_jeheap(r, MAPPINGS.as_deref())
373                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
374                let stats = borrow
375                    .stats()
376                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
377                let stats_rendered = render_jemalloc_stats(&stats);
378                let stats_rendered = stats_rendered
379                    .iter()
380                    .map(|(k, v)| (*k, v.as_str()))
381                    .collect::<Vec<_>>();
382                Ok(
383                    flamegraph(stacks, "Heap Flamegraph", true, &stats_rendered, build_info)
384                        .into_response(),
385                )
386            }
387            "time_fg" => {
388                let time_secs = time_secs.ok_or_else(|| {
389                    (
390                        StatusCode::BAD_REQUEST,
391                        "Expected value for `time_secs`".to_owned(),
392                    )
393                })?;
394                let hz = hz.ok_or_else(|| {
395                    (
396                        StatusCode::BAD_REQUEST,
397                        "Expected value for `hz`".to_owned(),
398                    )
399                })?;
400                Ok(time_prof(merge_threads, build_info, time_secs, hz)
401                    .await
402                    .into_response())
403            }
404            x => Err((
405                StatusCode::BAD_REQUEST,
406                format!("unrecognized `action` parameter: {}", x),
407            )),
408        }
409    }
410
411    #[derive(Deserialize)]
412    pub struct ProfQuery {
413        action: Option<String>,
414    }
415
416    pub async fn handle_get(
417        Query(query): Query<ProfQuery>,
418        headers: HeaderMap,
419        build_info: &'static BuildInfo,
420    ) -> impl IntoResponse {
421        let prof_ctl = PROF_CTL.as_ref().unwrap();
422        match query.action.as_deref() {
423            Some("dump_stats") => {
424                let json = headers
425                    .get("accept")
426                    .map_or(false, |accept| accept.as_bytes() == b"application/json");
427                let mut borrow = prof_ctl.lock().await;
428                let s = borrow
429                    .dump_stats(json)
430                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
431                let content_type = match json {
432                    false => ContentType::text(),
433                    true => ContentType::json(),
434                };
435                Ok((TypedHeader(content_type), s).into_response())
436            }
437            Some(x) => Err((
438                StatusCode::BAD_REQUEST,
439                format!("unrecognized query: {}", x),
440            )),
441            None => Ok(render_template(prof_ctl, build_info).await.into_response()),
442        }
443    }
444
445    pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
446        let mut prof_ctl = PROF_CTL.as_ref().unwrap().lock().await;
447        require_profiling_activated(&prof_ctl)?;
448        let dump_file = prof_ctl
449            .dump()
450            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
451        let dump_reader = BufReader::new(dump_file);
452        let profile = parse_jeheap(dump_reader, MAPPINGS.as_deref())
453            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
454        let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
455        Ok(pprof)
456    }
457
458    async fn render_template(
459        prof_ctl: &Arc<Mutex<JemallocProfCtl>>,
460        build_info: &'static BuildInfo,
461    ) -> impl IntoResponse {
462        let prof_md = prof_ctl.lock().await.get_md();
463        mz_http_util::template_response(ProfTemplate {
464            version: build_info.version,
465            executable: &super::EXECUTABLE,
466            mem_prof: MemProfilingStatus::Enabled(prof_md.start_time),
467            ever_symbolized: ever_symbolized(),
468        })
469    }
470
471    /// Checks whether jemalloc profiling is activated an returns an error response if not.
472    fn require_profiling_activated(prof_ctl: &JemallocProfCtl) -> Result<(), (StatusCode, String)> {
473        if prof_ctl.activated() {
474            Ok(())
475        } else {
476            Err((StatusCode::FORBIDDEN, "heap profiling not activated".into()))
477        }
478    }
479}