Skip to main content

mz_prof_http/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Profiling HTTP endpoints.
11
12use std::env;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use askama::Template;
17use axum::response::IntoResponse;
18use axum::routing::{self, Router};
19use cfg_if::cfg_if;
20use http::StatusCode;
21use mz_build_info::BuildInfo;
22use mz_prof::StackProfileExt;
23use pprof_util::{ProfStartTime, StackProfile};
24
25cfg_if! {
26    if #[cfg(any(not(feature = "jemalloc"), miri))] {
27        use disabled::{handle_get, handle_post, handle_get_heap};
28    } else {
29        use enabled::{handle_get, handle_post, handle_get_heap};
30    }
31}
32
33static EXECUTABLE: LazyLock<String> = LazyLock::new(|| {
34    {
35        env::current_exe()
36            .ok()
37            .as_ref()
38            .and_then(|exe| exe.file_name())
39            .map(|exe| exe.to_string_lossy().into_owned())
40            .unwrap_or_else(|| "<unknown executable>".into())
41    }
42});
43
44mz_http_util::make_handle_static!(
45    dir_1: ::include_dir::include_dir!("$CARGO_MANIFEST_DIR/src/http/static"),
46    dir_2: ::include_dir::include_dir!("$OUT_DIR/src/http/static"),
47    prod_base_path: "src/http/static",
48    dev_base_path: "src/http/static-dev",
49);
50
51/// Creates a router that serves the profiling endpoints.
52pub fn router(build_info: &'static BuildInfo) -> Router {
53    Router::new()
54        .route(
55            "/",
56            routing::get(move |query, headers| handle_get(query, headers, build_info)),
57        )
58        .route(
59            "/",
60            routing::post(move |form| handle_post(form, build_info)),
61        )
62        .route("/heap", routing::get(handle_get_heap))
63        .route("/static/{*path}", routing::get(handle_static))
64}
65
66#[allow(dead_code)]
67enum MemProfilingStatus {
68    Disabled,
69    Enabled(Option<ProfStartTime>),
70}
71
72#[derive(Template)]
73#[template(path = "prof.html")]
74struct ProfTemplate<'a> {
75    version: &'a str,
76    executable: &'a str,
77    mem_prof: MemProfilingStatus,
78    ever_symbolized: bool,
79}
80
81#[derive(Template)]
82#[template(path = "flamegraph.html")]
83pub struct FlamegraphTemplate<'a> {
84    pub version: &'a str,
85    pub title: &'a str,
86    pub mzfg: &'a str,
87}
88
89#[allow(dropping_copy_types)]
90async fn time_prof(
91    merge_threads: bool,
92    build_info: &'static BuildInfo,
93    // the time in seconds to run the profiler for
94    time_secs: u64,
95    // the sampling frequency in Hz
96    sample_freq: u32,
97) -> impl IntoResponse + use<> {
98    let ctl_lock;
99    cfg_if! {
100        if #[cfg(any(not(feature = "jemalloc"), miri))] {
101            ctl_lock = ();
102        } else {
103            ctl_lock = if let Some(ctl) = jemalloc_pprof::PROF_CTL.as_ref() {
104                let mut borrow = ctl.lock().await;
105                borrow.deactivate().map_err(|e| {
106                    (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
107                })?;
108                Some(borrow)
109            } else {
110                None
111            };
112        }
113    }
114    // SAFETY: We ensure above that memory profiling is off.
115    // Since we hold the mutex, nobody else can be turning it back on in the intervening time.
116    let stacks = unsafe {
117        mz_prof::time::prof_time(Duration::from_secs(time_secs), sample_freq, merge_threads)
118    }
119    .await
120    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
121    // Fail with a compile error if we weren't holding the jemalloc lock.
122    drop(ctl_lock);
123    let (secs_s, freq_s) = (format!("{time_secs}"), format!("{sample_freq}"));
124    Ok::<_, (StatusCode, String)>(flamegraph(
125        stacks,
126        "CPU Time Flamegraph",
127        false,
128        &[
129            ("Sampling time (s)", &secs_s),
130            ("Sampling frequency (Hz)", &freq_s),
131        ],
132        build_info,
133    ))
134}
135
136fn flamegraph<'a, 'b>(
137    stacks: StackProfile,
138    title: &'a str,
139    display_bytes: bool,
140    extras: &'b [(&'b str, &'b str)],
141    build_info: &'static BuildInfo,
142) -> impl IntoResponse + use<'a> {
143    let mut header_extra = vec![];
144    if display_bytes {
145        header_extra.push(("display_bytes", "1"));
146    }
147    for (k, v) in extras {
148        header_extra.push((k, v));
149    }
150    let mzfg = stacks.to_mzfg(true, &header_extra);
151    mz_http_util::template_response(FlamegraphTemplate {
152        version: build_info.version,
153        title,
154        mzfg: &mzfg,
155    })
156}
157
158#[cfg(any(not(feature = "jemalloc"), miri))]
159mod disabled {
160    use axum::extract::{Form, Query};
161    use axum::response::IntoResponse;
162    use http::StatusCode;
163    use http::header::HeaderMap;
164    use mz_build_info::BuildInfo;
165    use serde::Deserialize;
166
167    use mz_prof::ever_symbolized;
168
169    use super::{MemProfilingStatus, ProfTemplate, time_prof};
170
171    #[derive(Deserialize)]
172    pub struct ProfQuery {
173        _action: Option<String>,
174    }
175
176    #[allow(clippy::unused_async)]
177    pub async fn handle_get(
178        _: Query<ProfQuery>,
179        _: HeaderMap,
180        build_info: &'static BuildInfo,
181    ) -> impl IntoResponse {
182        mz_http_util::template_response(ProfTemplate {
183            version: build_info.version,
184            executable: &super::EXECUTABLE,
185            mem_prof: MemProfilingStatus::Disabled,
186            ever_symbolized: ever_symbolized(),
187        })
188    }
189
190    #[derive(Deserialize)]
191    pub struct ProfForm {
192        action: String,
193        threads: Option<String>,
194        time_secs: Option<u64>,
195        hz: Option<u32>,
196    }
197
198    pub async fn handle_post(
199        Form(ProfForm {
200            action,
201            threads,
202            time_secs,
203            hz,
204        }): Form<ProfForm>,
205        build_info: &'static BuildInfo,
206    ) -> impl IntoResponse {
207        let merge_threads = threads.as_deref() == Some("merge");
208        match action.as_ref() {
209            "time_fg" => {
210                let time_secs = time_secs.ok_or_else(|| {
211                    (
212                        StatusCode::BAD_REQUEST,
213                        "Expected value for `time_secs`".to_owned(),
214                    )
215                })?;
216                let hz = hz.ok_or_else(|| {
217                    (
218                        StatusCode::BAD_REQUEST,
219                        "Expected value for `hz`".to_owned(),
220                    )
221                })?;
222
223                Ok(time_prof(merge_threads, build_info, time_secs, hz).await)
224            }
225            _ => Err((
226                StatusCode::BAD_REQUEST,
227                format!("unrecognized `action` parameter: {}", action),
228            )),
229        }
230    }
231
232    #[allow(clippy::unused_async)]
233    pub async fn handle_get_heap() -> Result<(), (StatusCode, String)> {
234        Err((
235            StatusCode::BAD_REQUEST,
236            "This software was compiled without heap profiling support.".to_string(),
237        ))
238    }
239}
240
241#[cfg(all(feature = "jemalloc", not(miri)))]
242mod enabled {
243    use std::io::{BufReader, Read};
244    use std::sync::Arc;
245
246    use axum::extract::{Form, Query};
247    use axum::response::IntoResponse;
248    use axum_extra::TypedHeader;
249    use bytesize::ByteSize;
250    use headers::ContentType;
251    use http::header::{CONTENT_DISPOSITION, HeaderMap};
252    use http::{HeaderValue, StatusCode};
253    use jemalloc_pprof::{JemallocProfCtl, PROF_CTL};
254    use mappings::MAPPINGS;
255    use mz_build_info::BuildInfo;
256    use mz_ore::cast::CastFrom;
257    use mz_prof::jemalloc::{JemallocProfCtlExt, JemallocStats};
258    use mz_prof::{StackProfileExt, ever_symbolized};
259    use pprof_util::parse_jeheap;
260    use serde::Deserialize;
261    use tokio::sync::Mutex;
262
263    use super::{MemProfilingStatus, ProfTemplate, flamegraph, time_prof};
264
265    #[derive(Deserialize)]
266    pub struct ProfForm {
267        action: String,
268        threads: Option<String>,
269        time_secs: Option<u64>,
270        hz: Option<u32>,
271    }
272
273    pub async fn handle_post(
274        Form(ProfForm {
275            action,
276            threads,
277            time_secs,
278            hz,
279        }): Form<ProfForm>,
280        build_info: &'static BuildInfo,
281    ) -> impl IntoResponse {
282        let prof_ctl = PROF_CTL.as_ref().unwrap();
283        let merge_threads = threads.as_deref() == Some("merge");
284
285        fn render_jemalloc_stats(stats: &JemallocStats) -> Vec<(&str, String)> {
286            let stats = [
287                ("Allocated", stats.allocated),
288                ("In active pages", stats.active),
289                ("Allocated for allocator metadata", stats.metadata),
290                (
291                    "Maximum number of bytes in physically resident data pages mapped by the allocator",
292                    stats.resident,
293                ),
294                ("Bytes unused, but retained by allocator", stats.retained),
295            ];
296            stats
297                .into_iter()
298                .map(|(k, v)| (k, ByteSize(u64::cast_from(v)).display().si().to_string()))
299                .collect()
300        }
301
302        match action.as_str() {
303            "activate" => {
304                {
305                    let mut borrow = prof_ctl.lock().await;
306                    borrow
307                        .activate()
308                        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
309                };
310                Ok(render_template(prof_ctl, build_info).await.into_response())
311            }
312            "deactivate" => {
313                {
314                    let mut borrow = prof_ctl.lock().await;
315                    borrow
316                        .deactivate()
317                        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
318                };
319                Ok(render_template(prof_ctl, build_info).await.into_response())
320            }
321            "dump_jeheap" => {
322                let mut borrow = prof_ctl.lock().await;
323                require_profiling_activated(&borrow)?;
324                let mut f = borrow
325                    .dump()
326                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
327                let mut s = String::new();
328                f.read_to_string(&mut s)
329                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
330                Ok((
331                    HeaderMap::from_iter([(
332                        CONTENT_DISPOSITION,
333                        HeaderValue::from_static("attachment; filename=\"jeprof.heap\""),
334                    )]),
335                    s,
336                )
337                    .into_response())
338            }
339            "dump_sym_mzfg" => {
340                let mut borrow = prof_ctl.lock().await;
341                require_profiling_activated(&borrow)?;
342                let f = borrow
343                    .dump()
344                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
345                let r = BufReader::new(f);
346                let stacks = parse_jeheap(r, MAPPINGS.as_deref())
347                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
348                let stats = borrow
349                    .stats()
350                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
351                let stats_rendered = render_jemalloc_stats(&stats);
352                let mut header = stats_rendered
353                    .iter()
354                    .map(|(k, v)| (*k, v.as_str()))
355                    .collect::<Vec<_>>();
356                header.push(("display_bytes", "1"));
357                let mzfg = stacks.to_mzfg(true, &header);
358                Ok((
359                    HeaderMap::from_iter([(
360                        CONTENT_DISPOSITION,
361                        HeaderValue::from_static("attachment; filename=\"trace.mzfg\""),
362                    )]),
363                    mzfg,
364                )
365                    .into_response())
366            }
367            "mem_fg" => {
368                let mut borrow = prof_ctl.lock().await;
369                require_profiling_activated(&borrow)?;
370                let f = borrow
371                    .dump()
372                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
373                let r = BufReader::new(f);
374                let stacks = parse_jeheap(r, MAPPINGS.as_deref())
375                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
376                let stats = borrow
377                    .stats()
378                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
379                let stats_rendered = render_jemalloc_stats(&stats);
380                let stats_rendered = stats_rendered
381                    .iter()
382                    .map(|(k, v)| (*k, v.as_str()))
383                    .collect::<Vec<_>>();
384                Ok(
385                    flamegraph(stacks, "Heap Flamegraph", true, &stats_rendered, build_info)
386                        .into_response(),
387                )
388            }
389            "time_fg" => {
390                let time_secs = time_secs.ok_or_else(|| {
391                    (
392                        StatusCode::BAD_REQUEST,
393                        "Expected value for `time_secs`".to_owned(),
394                    )
395                })?;
396                let hz = hz.ok_or_else(|| {
397                    (
398                        StatusCode::BAD_REQUEST,
399                        "Expected value for `hz`".to_owned(),
400                    )
401                })?;
402                Ok(time_prof(merge_threads, build_info, time_secs, hz)
403                    .await
404                    .into_response())
405            }
406            x => Err((
407                StatusCode::BAD_REQUEST,
408                format!("unrecognized `action` parameter: {}", x),
409            )),
410        }
411    }
412
413    #[derive(Deserialize)]
414    pub struct ProfQuery {
415        action: Option<String>,
416    }
417
418    pub async fn handle_get(
419        Query(query): Query<ProfQuery>,
420        headers: HeaderMap,
421        build_info: &'static BuildInfo,
422    ) -> impl IntoResponse {
423        let prof_ctl = PROF_CTL.as_ref().unwrap();
424        match query.action.as_deref() {
425            Some("dump_stats") => {
426                let json = headers
427                    .get("accept")
428                    .map_or(false, |accept| accept.as_bytes() == b"application/json");
429                let mut borrow = prof_ctl.lock().await;
430                let s = borrow
431                    .dump_stats(json)
432                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
433                let content_type = match json {
434                    false => ContentType::text(),
435                    true => ContentType::json(),
436                };
437                Ok((TypedHeader(content_type), s).into_response())
438            }
439            Some(x) => Err((
440                StatusCode::BAD_REQUEST,
441                format!("unrecognized query: {}", x),
442            )),
443            None => Ok(render_template(prof_ctl, build_info).await.into_response()),
444        }
445    }
446
447    pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
448        let mut prof_ctl = PROF_CTL.as_ref().unwrap().lock().await;
449        require_profiling_activated(&prof_ctl)?;
450        let dump_file = prof_ctl
451            .dump()
452            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
453        let dump_reader = BufReader::new(dump_file);
454        let profile = parse_jeheap(dump_reader, MAPPINGS.as_deref())
455            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
456        let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
457        Ok(pprof)
458    }
459
460    async fn render_template(
461        prof_ctl: &Arc<Mutex<JemallocProfCtl>>,
462        build_info: &'static BuildInfo,
463    ) -> impl IntoResponse {
464        let prof_md = prof_ctl.lock().await.get_md();
465        mz_http_util::template_response(ProfTemplate {
466            version: build_info.version,
467            executable: &super::EXECUTABLE,
468            mem_prof: MemProfilingStatus::Enabled(prof_md.start_time),
469            ever_symbolized: ever_symbolized(),
470        })
471    }
472
473    /// Checks whether jemalloc profiling is activated an returns an error response if not.
474    fn require_profiling_activated(prof_ctl: &JemallocProfCtl) -> Result<(), (StatusCode, String)> {
475        if prof_ctl.activated() {
476            Ok(())
477        } else {
478            Err((StatusCode::FORBIDDEN, "heap profiling not activated".into()))
479        }
480    }
481}