mz_prof_http/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Profiling HTTP endpoints.
11
12use std::env;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use askama::Template;
17use axum::response::IntoResponse;
18use axum::routing::{self, Router};
19use cfg_if::cfg_if;
20use http::StatusCode;
21use mz_build_info::BuildInfo;
22use mz_prof::StackProfileExt;
23use pprof_util::{ProfStartTime, StackProfile};
24
25cfg_if! {
26    if #[cfg(any(not(feature = "jemalloc"), miri))] {
27        use disabled::{handle_get, handle_post, handle_get_heap};
28    } else {
29        use enabled::{handle_get, handle_post, handle_get_heap};
30    }
31}
32
33static EXECUTABLE: LazyLock<String> = LazyLock::new(|| {
34    {
35        env::current_exe()
36            .ok()
37            .as_ref()
38            .and_then(|exe| exe.file_name())
39            .map(|exe| exe.to_string_lossy().into_owned())
40            .unwrap_or_else(|| "<unknown executable>".into())
41    }
42});
43
44mz_http_util::make_handle_static!(
45    dir_1: ::include_dir::include_dir!("$CARGO_MANIFEST_DIR/src/http/static"),
46    dir_2: ::include_dir::include_dir!("$OUT_DIR/src/http/static"),
47    prod_base_path: "src/http/static",
48    dev_base_path: "src/http/static-dev",
49);
50
51/// Creates a router that serves the profiling endpoints.
52pub fn router(build_info: &'static BuildInfo) -> Router {
53    Router::new()
54        .route(
55            "/",
56            routing::get(move |query, headers| handle_get(query, headers, build_info)),
57        )
58        .route(
59            "/",
60            routing::post(move |form| handle_post(form, build_info)),
61        )
62        .route("/heap", routing::get(handle_get_heap))
63        .route("/static/*path", routing::get(handle_static))
64}
65
66#[allow(dead_code)]
67enum MemProfilingStatus {
68    Disabled,
69    Enabled(Option<ProfStartTime>),
70}
71
72#[derive(Template)]
73#[template(path = "prof.html")]
74struct ProfTemplate<'a> {
75    version: &'a str,
76    executable: &'a str,
77    mem_prof: MemProfilingStatus,
78    ever_symbolized: bool,
79}
80
81#[derive(Template)]
82#[template(path = "flamegraph.html")]
83pub struct FlamegraphTemplate<'a> {
84    pub version: &'a str,
85    pub title: &'a str,
86    pub mzfg: &'a str,
87}
88
89#[allow(dropping_copy_types)]
90async fn time_prof(
91    merge_threads: bool,
92    build_info: &'static BuildInfo,
93    // the time in seconds to run the profiler for
94    time_secs: u64,
95    // the sampling frequency in Hz
96    sample_freq: u32,
97) -> impl IntoResponse + use<> {
98    let ctl_lock;
99    cfg_if! {
100        if #[cfg(any(not(feature = "jemalloc"), miri))] {
101            ctl_lock = ();
102        } else {
103            ctl_lock = if let Some(ctl) = jemalloc_pprof::PROF_CTL.as_ref() {
104                let mut borrow = ctl.lock().await;
105                borrow.deactivate().map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
106                Some(borrow)
107            } else {
108                None
109            };
110        }
111    }
112    // SAFETY: We ensure above that memory profiling is off.
113    // Since we hold the mutex, nobody else can be turning it back on in the intervening time.
114    let stacks = unsafe {
115        mz_prof::time::prof_time(Duration::from_secs(time_secs), sample_freq, merge_threads)
116    }
117    .await
118    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
119    // Fail with a compile error if we weren't holding the jemalloc lock.
120    drop(ctl_lock);
121    let (secs_s, freq_s) = (format!("{time_secs}"), format!("{sample_freq}"));
122    Ok::<_, (StatusCode, String)>(flamegraph(
123        stacks,
124        "CPU Time Flamegraph",
125        false,
126        &[
127            ("Sampling time (s)", &secs_s),
128            ("Sampling frequency (Hz)", &freq_s),
129        ],
130        build_info,
131    ))
132}
133
134fn flamegraph<'a, 'b>(
135    stacks: StackProfile,
136    title: &'a str,
137    display_bytes: bool,
138    extras: &'b [(&'b str, &'b str)],
139    build_info: &'static BuildInfo,
140) -> impl IntoResponse + use<'a> {
141    let mut header_extra = vec![];
142    if display_bytes {
143        header_extra.push(("display_bytes", "1"));
144    }
145    for (k, v) in extras {
146        header_extra.push((k, v));
147    }
148    let mzfg = stacks.to_mzfg(true, &header_extra);
149    mz_http_util::template_response(FlamegraphTemplate {
150        version: build_info.version,
151        title,
152        mzfg: &mzfg,
153    })
154}
155
156#[cfg(any(not(feature = "jemalloc"), miri))]
157mod disabled {
158    use axum::extract::{Form, Query};
159    use axum::response::IntoResponse;
160    use http::StatusCode;
161    use http::header::HeaderMap;
162    use mz_build_info::BuildInfo;
163    use serde::Deserialize;
164
165    use mz_prof::ever_symbolized;
166
167    use super::{MemProfilingStatus, ProfTemplate, time_prof};
168
169    #[derive(Deserialize)]
170    pub struct ProfQuery {
171        _action: Option<String>,
172    }
173
174    #[allow(clippy::unused_async)]
175    pub async fn handle_get(
176        _: Query<ProfQuery>,
177        _: HeaderMap,
178        build_info: &'static BuildInfo,
179    ) -> impl IntoResponse {
180        mz_http_util::template_response(ProfTemplate {
181            version: build_info.version,
182            executable: &super::EXECUTABLE,
183            mem_prof: MemProfilingStatus::Disabled,
184            ever_symbolized: ever_symbolized(),
185        })
186    }
187
188    #[derive(Deserialize)]
189    pub struct ProfForm {
190        action: String,
191        threads: Option<String>,
192        time_secs: Option<u64>,
193        hz: Option<u32>,
194    }
195
196    pub async fn handle_post(
197        Form(ProfForm {
198            action,
199            threads,
200            time_secs,
201            hz,
202        }): Form<ProfForm>,
203        build_info: &'static BuildInfo,
204    ) -> impl IntoResponse {
205        let merge_threads = threads.as_deref() == Some("merge");
206        match action.as_ref() {
207            "time_fg" => {
208                let time_secs = time_secs.ok_or_else(|| {
209                    (
210                        StatusCode::BAD_REQUEST,
211                        "Expected value for `time_secs`".to_owned(),
212                    )
213                })?;
214                let hz = hz.ok_or_else(|| {
215                    (
216                        StatusCode::BAD_REQUEST,
217                        "Expected value for `hz`".to_owned(),
218                    )
219                })?;
220
221                Ok(time_prof(merge_threads, build_info, time_secs, hz).await)
222            }
223            _ => Err((
224                StatusCode::BAD_REQUEST,
225                format!("unrecognized `action` parameter: {}", action),
226            )),
227        }
228    }
229
230    #[allow(clippy::unused_async)]
231    pub async fn handle_get_heap() -> Result<(), (StatusCode, String)> {
232        Err((
233            StatusCode::BAD_REQUEST,
234            "This software was compiled without heap profiling support.".to_string(),
235        ))
236    }
237}
238
239#[cfg(all(feature = "jemalloc", not(miri)))]
240mod enabled {
241    use std::io::{BufReader, Read};
242    use std::sync::Arc;
243
244    use axum::extract::{Form, Query};
245    use axum::response::IntoResponse;
246    use axum_extra::TypedHeader;
247    use bytesize::ByteSize;
248    use headers::ContentType;
249    use http::header::{CONTENT_DISPOSITION, HeaderMap};
250    use http::{HeaderValue, StatusCode};
251    use jemalloc_pprof::{JemallocProfCtl, PROF_CTL};
252    use mappings::MAPPINGS;
253    use mz_build_info::BuildInfo;
254    use mz_ore::cast::CastFrom;
255    use mz_prof::jemalloc::{JemallocProfCtlExt, JemallocStats};
256    use mz_prof::{StackProfileExt, ever_symbolized};
257    use pprof_util::parse_jeheap;
258    use serde::Deserialize;
259    use tokio::sync::Mutex;
260
261    use super::{MemProfilingStatus, ProfTemplate, flamegraph, time_prof};
262
263    #[derive(Deserialize)]
264    pub struct ProfForm {
265        action: String,
266        threads: Option<String>,
267        time_secs: Option<u64>,
268        hz: Option<u32>,
269    }
270
271    pub async fn handle_post(
272        Form(ProfForm {
273            action,
274            threads,
275            time_secs,
276            hz,
277        }): Form<ProfForm>,
278        build_info: &'static BuildInfo,
279    ) -> impl IntoResponse {
280        let prof_ctl = PROF_CTL.as_ref().unwrap();
281        let merge_threads = threads.as_deref() == Some("merge");
282
283        fn render_jemalloc_stats(stats: &JemallocStats) -> Vec<(&str, String)> {
284            vec![
285                (
286                    "Allocated",
287                    ByteSize(u64::cast_from(stats.allocated)).to_string_as(true),
288                ),
289                (
290                    "In active pages",
291                    ByteSize(u64::cast_from(stats.active)).to_string_as(true),
292                ),
293                (
294                    "Allocated for allocator metadata",
295                    ByteSize(u64::cast_from(stats.metadata)).to_string_as(true),
296                ),
297                (
298                    "Maximum number of bytes in physically resident data pages mapped by the allocator",
299                    ByteSize(u64::cast_from(stats.resident)).to_string_as(true),
300                ),
301                (
302                    "Bytes unused, but retained by allocator",
303                    ByteSize(u64::cast_from(stats.retained)).to_string_as(true),
304                ),
305            ]
306        }
307
308        match action.as_str() {
309            "activate" => {
310                {
311                    let mut borrow = prof_ctl.lock().await;
312                    borrow
313                        .activate()
314                        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
315                };
316                Ok(render_template(prof_ctl, build_info).await.into_response())
317            }
318            "deactivate" => {
319                {
320                    let mut borrow = prof_ctl.lock().await;
321                    borrow
322                        .deactivate()
323                        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
324                };
325                Ok(render_template(prof_ctl, build_info).await.into_response())
326            }
327            "dump_jeheap" => {
328                let mut borrow = prof_ctl.lock().await;
329                require_profiling_activated(&borrow)?;
330                let mut f = borrow
331                    .dump()
332                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
333                let mut s = String::new();
334                f.read_to_string(&mut s)
335                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
336                Ok((
337                    HeaderMap::from_iter([(
338                        CONTENT_DISPOSITION,
339                        HeaderValue::from_static("attachment; filename=\"jeprof.heap\""),
340                    )]),
341                    s,
342                )
343                    .into_response())
344            }
345            "dump_sym_mzfg" => {
346                let mut borrow = prof_ctl.lock().await;
347                require_profiling_activated(&borrow)?;
348                let f = borrow
349                    .dump()
350                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
351                let r = BufReader::new(f);
352                let stacks = parse_jeheap(r, MAPPINGS.as_deref())
353                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
354                let stats = borrow
355                    .stats()
356                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
357                let stats_rendered = render_jemalloc_stats(&stats);
358                let mut header = stats_rendered
359                    .iter()
360                    .map(|(k, v)| (*k, v.as_str()))
361                    .collect::<Vec<_>>();
362                header.push(("display_bytes", "1"));
363                let mzfg = stacks.to_mzfg(true, &header);
364                Ok((
365                    HeaderMap::from_iter([(
366                        CONTENT_DISPOSITION,
367                        HeaderValue::from_static("attachment; filename=\"trace.mzfg\""),
368                    )]),
369                    mzfg,
370                )
371                    .into_response())
372            }
373            "mem_fg" => {
374                let mut borrow = prof_ctl.lock().await;
375                require_profiling_activated(&borrow)?;
376                let f = borrow
377                    .dump()
378                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
379                let r = BufReader::new(f);
380                let stacks = parse_jeheap(r, MAPPINGS.as_deref())
381                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
382                let stats = borrow
383                    .stats()
384                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
385                let stats_rendered = render_jemalloc_stats(&stats);
386                let stats_rendered = stats_rendered
387                    .iter()
388                    .map(|(k, v)| (*k, v.as_str()))
389                    .collect::<Vec<_>>();
390                Ok(
391                    flamegraph(stacks, "Heap Flamegraph", true, &stats_rendered, build_info)
392                        .into_response(),
393                )
394            }
395            "time_fg" => {
396                let time_secs = time_secs.ok_or_else(|| {
397                    (
398                        StatusCode::BAD_REQUEST,
399                        "Expected value for `time_secs`".to_owned(),
400                    )
401                })?;
402                let hz = hz.ok_or_else(|| {
403                    (
404                        StatusCode::BAD_REQUEST,
405                        "Expected value for `hz`".to_owned(),
406                    )
407                })?;
408                Ok(time_prof(merge_threads, build_info, time_secs, hz)
409                    .await
410                    .into_response())
411            }
412            x => Err((
413                StatusCode::BAD_REQUEST,
414                format!("unrecognized `action` parameter: {}", x),
415            )),
416        }
417    }
418
419    #[derive(Deserialize)]
420    pub struct ProfQuery {
421        action: Option<String>,
422    }
423
424    pub async fn handle_get(
425        Query(query): Query<ProfQuery>,
426        headers: HeaderMap,
427        build_info: &'static BuildInfo,
428    ) -> impl IntoResponse {
429        let prof_ctl = PROF_CTL.as_ref().unwrap();
430        match query.action.as_deref() {
431            Some("dump_stats") => {
432                let json = headers
433                    .get("accept")
434                    .map_or(false, |accept| accept.as_bytes() == b"application/json");
435                let mut borrow = prof_ctl.lock().await;
436                let s = borrow
437                    .dump_stats(json)
438                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
439                let content_type = match json {
440                    false => ContentType::text(),
441                    true => ContentType::json(),
442                };
443                Ok((TypedHeader(content_type), s).into_response())
444            }
445            Some(x) => Err((
446                StatusCode::BAD_REQUEST,
447                format!("unrecognized query: {}", x),
448            )),
449            None => Ok(render_template(prof_ctl, build_info).await.into_response()),
450        }
451    }
452
453    pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
454        let mut prof_ctl = PROF_CTL.as_ref().unwrap().lock().await;
455        require_profiling_activated(&prof_ctl)?;
456        let dump_file = prof_ctl
457            .dump()
458            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
459        let dump_reader = BufReader::new(dump_file);
460        let profile = parse_jeheap(dump_reader, MAPPINGS.as_deref())
461            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
462        let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
463        Ok(pprof)
464    }
465
466    async fn render_template(
467        prof_ctl: &Arc<Mutex<JemallocProfCtl>>,
468        build_info: &'static BuildInfo,
469    ) -> impl IntoResponse {
470        let prof_md = prof_ctl.lock().await.get_md();
471        mz_http_util::template_response(ProfTemplate {
472            version: build_info.version,
473            executable: &super::EXECUTABLE,
474            mem_prof: MemProfilingStatus::Enabled(prof_md.start_time),
475            ever_symbolized: ever_symbolized(),
476        })
477    }
478
479    /// Checks whether jemalloc profiling is activated an returns an error response if not.
480    fn require_profiling_activated(prof_ctl: &JemallocProfCtl) -> Result<(), (StatusCode, String)> {
481        if prof_ctl.activated() {
482            Ok(())
483        } else {
484            Err((StatusCode::FORBIDDEN, "heap profiling not activated".into()))
485        }
486    }
487}