1use std::env;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use askama::Template;
17use axum::response::IntoResponse;
18use axum::routing::{self, Router};
19use cfg_if::cfg_if;
20use http::StatusCode;
21use mz_build_info::BuildInfo;
22use mz_prof::StackProfileExt;
23use pprof_util::{ProfStartTime, StackProfile};
24
25cfg_if! {
26 if #[cfg(any(not(feature = "jemalloc"), miri))] {
27 use disabled::{handle_get, handle_post, handle_get_heap};
28 } else {
29 use enabled::{handle_get, handle_post, handle_get_heap};
30 }
31}
32
33static EXECUTABLE: LazyLock<String> = LazyLock::new(|| {
34 {
35 env::current_exe()
36 .ok()
37 .as_ref()
38 .and_then(|exe| exe.file_name())
39 .map(|exe| exe.to_string_lossy().into_owned())
40 .unwrap_or_else(|| "<unknown executable>".into())
41 }
42});
43
44mz_http_util::make_handle_static!(
45 dir_1: ::include_dir::include_dir!("$CARGO_MANIFEST_DIR/src/http/static"),
46 dir_2: ::include_dir::include_dir!("$OUT_DIR/src/http/static"),
47 prod_base_path: "src/http/static",
48 dev_base_path: "src/http/static-dev",
49);
50
51pub fn router(build_info: &'static BuildInfo) -> Router {
53 Router::new()
54 .route(
55 "/",
56 routing::get(move |query, headers| handle_get(query, headers, build_info)),
57 )
58 .route(
59 "/",
60 routing::post(move |form| handle_post(form, build_info)),
61 )
62 .route("/heap", routing::get(handle_get_heap))
63 .route("/static/*path", routing::get(handle_static))
64}
65
66#[allow(dead_code)]
67enum MemProfilingStatus {
68 Disabled,
69 Enabled(Option<ProfStartTime>),
70}
71
72#[derive(Template)]
73#[template(path = "prof.html")]
74struct ProfTemplate<'a> {
75 version: &'a str,
76 executable: &'a str,
77 mem_prof: MemProfilingStatus,
78 ever_symbolized: bool,
79}
80
81#[derive(Template)]
82#[template(path = "flamegraph.html")]
83pub struct FlamegraphTemplate<'a> {
84 pub version: &'a str,
85 pub title: &'a str,
86 pub mzfg: &'a str,
87}
88
89#[allow(dropping_copy_types)]
90async fn time_prof(
91 merge_threads: bool,
92 build_info: &'static BuildInfo,
93 time_secs: u64,
95 sample_freq: u32,
97) -> impl IntoResponse + use<> {
98 let ctl_lock;
99 cfg_if! {
100 if #[cfg(any(not(feature = "jemalloc"), miri))] {
101 ctl_lock = ();
102 } else {
103 ctl_lock = if let Some(ctl) = jemalloc_pprof::PROF_CTL.as_ref() {
104 let mut borrow = ctl.lock().await;
105 borrow.deactivate().map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
106 Some(borrow)
107 } else {
108 None
109 };
110 }
111 }
112 let stacks = unsafe {
115 mz_prof::time::prof_time(Duration::from_secs(time_secs), sample_freq, merge_threads)
116 }
117 .await
118 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
119 drop(ctl_lock);
121 let (secs_s, freq_s) = (format!("{time_secs}"), format!("{sample_freq}"));
122 Ok::<_, (StatusCode, String)>(flamegraph(
123 stacks,
124 "CPU Time Flamegraph",
125 false,
126 &[
127 ("Sampling time (s)", &secs_s),
128 ("Sampling frequency (Hz)", &freq_s),
129 ],
130 build_info,
131 ))
132}
133
134fn flamegraph<'a, 'b>(
135 stacks: StackProfile,
136 title: &'a str,
137 display_bytes: bool,
138 extras: &'b [(&'b str, &'b str)],
139 build_info: &'static BuildInfo,
140) -> impl IntoResponse + use<'a> {
141 let mut header_extra = vec![];
142 if display_bytes {
143 header_extra.push(("display_bytes", "1"));
144 }
145 for (k, v) in extras {
146 header_extra.push((k, v));
147 }
148 let mzfg = stacks.to_mzfg(true, &header_extra);
149 mz_http_util::template_response(FlamegraphTemplate {
150 version: build_info.version,
151 title,
152 mzfg: &mzfg,
153 })
154}
155
156#[cfg(any(not(feature = "jemalloc"), miri))]
157mod disabled {
158 use axum::extract::{Form, Query};
159 use axum::response::IntoResponse;
160 use http::StatusCode;
161 use http::header::HeaderMap;
162 use mz_build_info::BuildInfo;
163 use serde::Deserialize;
164
165 use mz_prof::ever_symbolized;
166
167 use super::{MemProfilingStatus, ProfTemplate, time_prof};
168
169 #[derive(Deserialize)]
170 pub struct ProfQuery {
171 _action: Option<String>,
172 }
173
174 #[allow(clippy::unused_async)]
175 pub async fn handle_get(
176 _: Query<ProfQuery>,
177 _: HeaderMap,
178 build_info: &'static BuildInfo,
179 ) -> impl IntoResponse {
180 mz_http_util::template_response(ProfTemplate {
181 version: build_info.version,
182 executable: &super::EXECUTABLE,
183 mem_prof: MemProfilingStatus::Disabled,
184 ever_symbolized: ever_symbolized(),
185 })
186 }
187
188 #[derive(Deserialize)]
189 pub struct ProfForm {
190 action: String,
191 threads: Option<String>,
192 time_secs: Option<u64>,
193 hz: Option<u32>,
194 }
195
196 pub async fn handle_post(
197 Form(ProfForm {
198 action,
199 threads,
200 time_secs,
201 hz,
202 }): Form<ProfForm>,
203 build_info: &'static BuildInfo,
204 ) -> impl IntoResponse {
205 let merge_threads = threads.as_deref() == Some("merge");
206 match action.as_ref() {
207 "time_fg" => {
208 let time_secs = time_secs.ok_or_else(|| {
209 (
210 StatusCode::BAD_REQUEST,
211 "Expected value for `time_secs`".to_owned(),
212 )
213 })?;
214 let hz = hz.ok_or_else(|| {
215 (
216 StatusCode::BAD_REQUEST,
217 "Expected value for `hz`".to_owned(),
218 )
219 })?;
220
221 Ok(time_prof(merge_threads, build_info, time_secs, hz).await)
222 }
223 _ => Err((
224 StatusCode::BAD_REQUEST,
225 format!("unrecognized `action` parameter: {}", action),
226 )),
227 }
228 }
229
230 #[allow(clippy::unused_async)]
231 pub async fn handle_get_heap() -> Result<(), (StatusCode, String)> {
232 Err((
233 StatusCode::BAD_REQUEST,
234 "This software was compiled without heap profiling support.".to_string(),
235 ))
236 }
237}
238
239#[cfg(all(feature = "jemalloc", not(miri)))]
240mod enabled {
241 use std::io::{BufReader, Read};
242 use std::sync::Arc;
243
244 use axum::extract::{Form, Query};
245 use axum::response::IntoResponse;
246 use axum_extra::TypedHeader;
247 use bytesize::ByteSize;
248 use headers::ContentType;
249 use http::header::{CONTENT_DISPOSITION, HeaderMap};
250 use http::{HeaderValue, StatusCode};
251 use jemalloc_pprof::{JemallocProfCtl, PROF_CTL};
252 use mappings::MAPPINGS;
253 use mz_build_info::BuildInfo;
254 use mz_ore::cast::CastFrom;
255 use mz_prof::jemalloc::{JemallocProfCtlExt, JemallocStats};
256 use mz_prof::{StackProfileExt, ever_symbolized};
257 use pprof_util::parse_jeheap;
258 use serde::Deserialize;
259 use tokio::sync::Mutex;
260
261 use super::{MemProfilingStatus, ProfTemplate, flamegraph, time_prof};
262
263 #[derive(Deserialize)]
264 pub struct ProfForm {
265 action: String,
266 threads: Option<String>,
267 time_secs: Option<u64>,
268 hz: Option<u32>,
269 }
270
271 pub async fn handle_post(
272 Form(ProfForm {
273 action,
274 threads,
275 time_secs,
276 hz,
277 }): Form<ProfForm>,
278 build_info: &'static BuildInfo,
279 ) -> impl IntoResponse {
280 let prof_ctl = PROF_CTL.as_ref().unwrap();
281 let merge_threads = threads.as_deref() == Some("merge");
282
283 fn render_jemalloc_stats(stats: &JemallocStats) -> Vec<(&str, String)> {
284 vec![
285 (
286 "Allocated",
287 ByteSize(u64::cast_from(stats.allocated)).to_string_as(true),
288 ),
289 (
290 "In active pages",
291 ByteSize(u64::cast_from(stats.active)).to_string_as(true),
292 ),
293 (
294 "Allocated for allocator metadata",
295 ByteSize(u64::cast_from(stats.metadata)).to_string_as(true),
296 ),
297 (
298 "Maximum number of bytes in physically resident data pages mapped by the allocator",
299 ByteSize(u64::cast_from(stats.resident)).to_string_as(true),
300 ),
301 (
302 "Bytes unused, but retained by allocator",
303 ByteSize(u64::cast_from(stats.retained)).to_string_as(true),
304 ),
305 ]
306 }
307
308 match action.as_str() {
309 "activate" => {
310 {
311 let mut borrow = prof_ctl.lock().await;
312 borrow
313 .activate()
314 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
315 };
316 Ok(render_template(prof_ctl, build_info).await.into_response())
317 }
318 "deactivate" => {
319 {
320 let mut borrow = prof_ctl.lock().await;
321 borrow
322 .deactivate()
323 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
324 };
325 Ok(render_template(prof_ctl, build_info).await.into_response())
326 }
327 "dump_jeheap" => {
328 let mut borrow = prof_ctl.lock().await;
329 require_profiling_activated(&borrow)?;
330 let mut f = borrow
331 .dump()
332 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
333 let mut s = String::new();
334 f.read_to_string(&mut s)
335 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
336 Ok((
337 HeaderMap::from_iter([(
338 CONTENT_DISPOSITION,
339 HeaderValue::from_static("attachment; filename=\"jeprof.heap\""),
340 )]),
341 s,
342 )
343 .into_response())
344 }
345 "dump_sym_mzfg" => {
346 let mut borrow = prof_ctl.lock().await;
347 require_profiling_activated(&borrow)?;
348 let f = borrow
349 .dump()
350 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
351 let r = BufReader::new(f);
352 let stacks = parse_jeheap(r, MAPPINGS.as_deref())
353 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
354 let stats = borrow
355 .stats()
356 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
357 let stats_rendered = render_jemalloc_stats(&stats);
358 let mut header = stats_rendered
359 .iter()
360 .map(|(k, v)| (*k, v.as_str()))
361 .collect::<Vec<_>>();
362 header.push(("display_bytes", "1"));
363 let mzfg = stacks.to_mzfg(true, &header);
364 Ok((
365 HeaderMap::from_iter([(
366 CONTENT_DISPOSITION,
367 HeaderValue::from_static("attachment; filename=\"trace.mzfg\""),
368 )]),
369 mzfg,
370 )
371 .into_response())
372 }
373 "mem_fg" => {
374 let mut borrow = prof_ctl.lock().await;
375 require_profiling_activated(&borrow)?;
376 let f = borrow
377 .dump()
378 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
379 let r = BufReader::new(f);
380 let stacks = parse_jeheap(r, MAPPINGS.as_deref())
381 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
382 let stats = borrow
383 .stats()
384 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
385 let stats_rendered = render_jemalloc_stats(&stats);
386 let stats_rendered = stats_rendered
387 .iter()
388 .map(|(k, v)| (*k, v.as_str()))
389 .collect::<Vec<_>>();
390 Ok(
391 flamegraph(stacks, "Heap Flamegraph", true, &stats_rendered, build_info)
392 .into_response(),
393 )
394 }
395 "time_fg" => {
396 let time_secs = time_secs.ok_or_else(|| {
397 (
398 StatusCode::BAD_REQUEST,
399 "Expected value for `time_secs`".to_owned(),
400 )
401 })?;
402 let hz = hz.ok_or_else(|| {
403 (
404 StatusCode::BAD_REQUEST,
405 "Expected value for `hz`".to_owned(),
406 )
407 })?;
408 Ok(time_prof(merge_threads, build_info, time_secs, hz)
409 .await
410 .into_response())
411 }
412 x => Err((
413 StatusCode::BAD_REQUEST,
414 format!("unrecognized `action` parameter: {}", x),
415 )),
416 }
417 }
418
419 #[derive(Deserialize)]
420 pub struct ProfQuery {
421 action: Option<String>,
422 }
423
424 pub async fn handle_get(
425 Query(query): Query<ProfQuery>,
426 headers: HeaderMap,
427 build_info: &'static BuildInfo,
428 ) -> impl IntoResponse {
429 let prof_ctl = PROF_CTL.as_ref().unwrap();
430 match query.action.as_deref() {
431 Some("dump_stats") => {
432 let json = headers
433 .get("accept")
434 .map_or(false, |accept| accept.as_bytes() == b"application/json");
435 let mut borrow = prof_ctl.lock().await;
436 let s = borrow
437 .dump_stats(json)
438 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
439 let content_type = match json {
440 false => ContentType::text(),
441 true => ContentType::json(),
442 };
443 Ok((TypedHeader(content_type), s).into_response())
444 }
445 Some(x) => Err((
446 StatusCode::BAD_REQUEST,
447 format!("unrecognized query: {}", x),
448 )),
449 None => Ok(render_template(prof_ctl, build_info).await.into_response()),
450 }
451 }
452
453 pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
454 let mut prof_ctl = PROF_CTL.as_ref().unwrap().lock().await;
455 require_profiling_activated(&prof_ctl)?;
456 let dump_file = prof_ctl
457 .dump()
458 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
459 let dump_reader = BufReader::new(dump_file);
460 let profile = parse_jeheap(dump_reader, MAPPINGS.as_deref())
461 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
462 let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
463 Ok(pprof)
464 }
465
466 async fn render_template(
467 prof_ctl: &Arc<Mutex<JemallocProfCtl>>,
468 build_info: &'static BuildInfo,
469 ) -> impl IntoResponse {
470 let prof_md = prof_ctl.lock().await.get_md();
471 mz_http_util::template_response(ProfTemplate {
472 version: build_info.version,
473 executable: &super::EXECUTABLE,
474 mem_prof: MemProfilingStatus::Enabled(prof_md.start_time),
475 ever_symbolized: ever_symbolized(),
476 })
477 }
478
479 fn require_profiling_activated(prof_ctl: &JemallocProfCtl) -> Result<(), (StatusCode, String)> {
481 if prof_ctl.activated() {
482 Ok(())
483 } else {
484 Err((StatusCode::FORBIDDEN, "heap profiling not activated".into()))
485 }
486 }
487}