1use std::env;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use askama::Template;
17use axum::response::IntoResponse;
18use axum::routing::{self, Router};
19use cfg_if::cfg_if;
20use http::StatusCode;
21use mz_build_info::BuildInfo;
22use mz_prof::StackProfileExt;
23use pprof_util::{ProfStartTime, StackProfile};
24
25cfg_if! {
26 if #[cfg(any(not(feature = "jemalloc"), miri))] {
27 use disabled::{handle_get, handle_post, handle_get_heap};
28 } else {
29 use enabled::{handle_get, handle_post, handle_get_heap};
30 }
31}
32
33static EXECUTABLE: LazyLock<String> = LazyLock::new(|| {
34 {
35 env::current_exe()
36 .ok()
37 .as_ref()
38 .and_then(|exe| exe.file_name())
39 .map(|exe| exe.to_string_lossy().into_owned())
40 .unwrap_or_else(|| "<unknown executable>".into())
41 }
42});
43
44mz_http_util::make_handle_static!(
45 dir_1: ::include_dir::include_dir!("$CARGO_MANIFEST_DIR/src/http/static"),
46 dir_2: ::include_dir::include_dir!("$OUT_DIR/src/http/static"),
47 prod_base_path: "src/http/static",
48 dev_base_path: "src/http/static-dev",
49);
50
51pub fn router(build_info: &'static BuildInfo) -> Router {
53 Router::new()
54 .route(
55 "/",
56 routing::get(move |query, headers| handle_get(query, headers, build_info)),
57 )
58 .route(
59 "/",
60 routing::post(move |form| handle_post(form, build_info)),
61 )
62 .route("/heap", routing::get(handle_get_heap))
63 .route("/static/{*path}", routing::get(handle_static))
64}
65
66#[allow(dead_code)]
67enum MemProfilingStatus {
68 Disabled,
69 Enabled(Option<ProfStartTime>),
70}
71
72#[derive(Template)]
73#[template(path = "prof.html")]
74struct ProfTemplate<'a> {
75 version: &'a str,
76 executable: &'a str,
77 mem_prof: MemProfilingStatus,
78 ever_symbolized: bool,
79}
80
81#[derive(Template)]
82#[template(path = "flamegraph.html")]
83pub struct FlamegraphTemplate<'a> {
84 pub version: &'a str,
85 pub title: &'a str,
86 pub mzfg: &'a str,
87}
88
89#[allow(dropping_copy_types)]
90async fn time_prof(
91 merge_threads: bool,
92 build_info: &'static BuildInfo,
93 time_secs: u64,
95 sample_freq: u32,
97) -> impl IntoResponse + use<> {
98 let ctl_lock;
99 cfg_if! {
100 if #[cfg(any(not(feature = "jemalloc"), miri))] {
101 ctl_lock = ();
102 } else {
103 ctl_lock = if let Some(ctl) = jemalloc_pprof::PROF_CTL.as_ref() {
104 let mut borrow = ctl.lock().await;
105 borrow.deactivate().map_err(|e| {
106 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
107 })?;
108 Some(borrow)
109 } else {
110 None
111 };
112 }
113 }
114 let stacks = unsafe {
117 mz_prof::time::prof_time(Duration::from_secs(time_secs), sample_freq, merge_threads)
118 }
119 .await
120 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
121 drop(ctl_lock);
123 let (secs_s, freq_s) = (format!("{time_secs}"), format!("{sample_freq}"));
124 Ok::<_, (StatusCode, String)>(flamegraph(
125 stacks,
126 "CPU Time Flamegraph",
127 false,
128 &[
129 ("Sampling time (s)", &secs_s),
130 ("Sampling frequency (Hz)", &freq_s),
131 ],
132 build_info,
133 ))
134}
135
136fn flamegraph<'a, 'b>(
137 stacks: StackProfile,
138 title: &'a str,
139 display_bytes: bool,
140 extras: &'b [(&'b str, &'b str)],
141 build_info: &'static BuildInfo,
142) -> impl IntoResponse + use<'a> {
143 let mut header_extra = vec![];
144 if display_bytes {
145 header_extra.push(("display_bytes", "1"));
146 }
147 for (k, v) in extras {
148 header_extra.push((k, v));
149 }
150 let mzfg = stacks.to_mzfg(true, &header_extra);
151 mz_http_util::template_response(FlamegraphTemplate {
152 version: build_info.version,
153 title,
154 mzfg: &mzfg,
155 })
156}
157
158#[cfg(any(not(feature = "jemalloc"), miri))]
159mod disabled {
160 use axum::extract::{Form, Query};
161 use axum::response::IntoResponse;
162 use http::StatusCode;
163 use http::header::HeaderMap;
164 use mz_build_info::BuildInfo;
165 use serde::Deserialize;
166
167 use mz_prof::ever_symbolized;
168
169 use super::{MemProfilingStatus, ProfTemplate, time_prof};
170
171 #[derive(Deserialize)]
172 pub struct ProfQuery {
173 _action: Option<String>,
174 }
175
176 #[allow(clippy::unused_async)]
177 pub async fn handle_get(
178 _: Query<ProfQuery>,
179 _: HeaderMap,
180 build_info: &'static BuildInfo,
181 ) -> impl IntoResponse {
182 mz_http_util::template_response(ProfTemplate {
183 version: build_info.version,
184 executable: &super::EXECUTABLE,
185 mem_prof: MemProfilingStatus::Disabled,
186 ever_symbolized: ever_symbolized(),
187 })
188 }
189
190 #[derive(Deserialize)]
191 pub struct ProfForm {
192 action: String,
193 threads: Option<String>,
194 time_secs: Option<u64>,
195 hz: Option<u32>,
196 }
197
198 pub async fn handle_post(
199 Form(ProfForm {
200 action,
201 threads,
202 time_secs,
203 hz,
204 }): Form<ProfForm>,
205 build_info: &'static BuildInfo,
206 ) -> impl IntoResponse {
207 let merge_threads = threads.as_deref() == Some("merge");
208 match action.as_ref() {
209 "time_fg" => {
210 let time_secs = time_secs.ok_or_else(|| {
211 (
212 StatusCode::BAD_REQUEST,
213 "Expected value for `time_secs`".to_owned(),
214 )
215 })?;
216 let hz = hz.ok_or_else(|| {
217 (
218 StatusCode::BAD_REQUEST,
219 "Expected value for `hz`".to_owned(),
220 )
221 })?;
222
223 Ok(time_prof(merge_threads, build_info, time_secs, hz).await)
224 }
225 _ => Err((
226 StatusCode::BAD_REQUEST,
227 format!("unrecognized `action` parameter: {}", action),
228 )),
229 }
230 }
231
232 #[allow(clippy::unused_async)]
233 pub async fn handle_get_heap() -> Result<(), (StatusCode, String)> {
234 Err((
235 StatusCode::BAD_REQUEST,
236 "This software was compiled without heap profiling support.".to_string(),
237 ))
238 }
239}
240
241#[cfg(all(feature = "jemalloc", not(miri)))]
242mod enabled {
243 use std::io::{BufReader, Read};
244 use std::sync::Arc;
245
246 use axum::extract::{Form, Query};
247 use axum::response::IntoResponse;
248 use axum_extra::TypedHeader;
249 use bytesize::ByteSize;
250 use headers::ContentType;
251 use http::header::{CONTENT_DISPOSITION, HeaderMap};
252 use http::{HeaderValue, StatusCode};
253 use jemalloc_pprof::{JemallocProfCtl, PROF_CTL};
254 use mappings::MAPPINGS;
255 use mz_build_info::BuildInfo;
256 use mz_ore::cast::CastFrom;
257 use mz_prof::jemalloc::{JemallocProfCtlExt, JemallocStats};
258 use mz_prof::{StackProfileExt, ever_symbolized};
259 use pprof_util::parse_jeheap;
260 use serde::Deserialize;
261 use tokio::sync::Mutex;
262
263 use super::{MemProfilingStatus, ProfTemplate, flamegraph, time_prof};
264
265 #[derive(Deserialize)]
266 pub struct ProfForm {
267 action: String,
268 threads: Option<String>,
269 time_secs: Option<u64>,
270 hz: Option<u32>,
271 }
272
273 pub async fn handle_post(
274 Form(ProfForm {
275 action,
276 threads,
277 time_secs,
278 hz,
279 }): Form<ProfForm>,
280 build_info: &'static BuildInfo,
281 ) -> impl IntoResponse {
282 let prof_ctl = PROF_CTL.as_ref().unwrap();
283 let merge_threads = threads.as_deref() == Some("merge");
284
285 fn render_jemalloc_stats(stats: &JemallocStats) -> Vec<(&str, String)> {
286 let stats = [
287 ("Allocated", stats.allocated),
288 ("In active pages", stats.active),
289 ("Allocated for allocator metadata", stats.metadata),
290 (
291 "Maximum number of bytes in physically resident data pages mapped by the allocator",
292 stats.resident,
293 ),
294 ("Bytes unused, but retained by allocator", stats.retained),
295 ];
296 stats
297 .into_iter()
298 .map(|(k, v)| (k, ByteSize(u64::cast_from(v)).display().si().to_string()))
299 .collect()
300 }
301
302 match action.as_str() {
303 "activate" => {
304 {
305 let mut borrow = prof_ctl.lock().await;
306 borrow
307 .activate()
308 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
309 };
310 Ok(render_template(prof_ctl, build_info).await.into_response())
311 }
312 "deactivate" => {
313 {
314 let mut borrow = prof_ctl.lock().await;
315 borrow
316 .deactivate()
317 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
318 };
319 Ok(render_template(prof_ctl, build_info).await.into_response())
320 }
321 "dump_jeheap" => {
322 let mut borrow = prof_ctl.lock().await;
323 require_profiling_activated(&borrow)?;
324 let mut f = borrow
325 .dump()
326 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
327 let mut s = String::new();
328 f.read_to_string(&mut s)
329 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
330 Ok((
331 HeaderMap::from_iter([(
332 CONTENT_DISPOSITION,
333 HeaderValue::from_static("attachment; filename=\"jeprof.heap\""),
334 )]),
335 s,
336 )
337 .into_response())
338 }
339 "dump_sym_mzfg" => {
340 let mut borrow = prof_ctl.lock().await;
341 require_profiling_activated(&borrow)?;
342 let f = borrow
343 .dump()
344 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
345 let r = BufReader::new(f);
346 let stacks = parse_jeheap(r, MAPPINGS.as_deref())
347 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
348 let stats = borrow
349 .stats()
350 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
351 let stats_rendered = render_jemalloc_stats(&stats);
352 let mut header = stats_rendered
353 .iter()
354 .map(|(k, v)| (*k, v.as_str()))
355 .collect::<Vec<_>>();
356 header.push(("display_bytes", "1"));
357 let mzfg = stacks.to_mzfg(true, &header);
358 Ok((
359 HeaderMap::from_iter([(
360 CONTENT_DISPOSITION,
361 HeaderValue::from_static("attachment; filename=\"trace.mzfg\""),
362 )]),
363 mzfg,
364 )
365 .into_response())
366 }
367 "mem_fg" => {
368 let mut borrow = prof_ctl.lock().await;
369 require_profiling_activated(&borrow)?;
370 let f = borrow
371 .dump()
372 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
373 let r = BufReader::new(f);
374 let stacks = parse_jeheap(r, MAPPINGS.as_deref())
375 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
376 let stats = borrow
377 .stats()
378 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
379 let stats_rendered = render_jemalloc_stats(&stats);
380 let stats_rendered = stats_rendered
381 .iter()
382 .map(|(k, v)| (*k, v.as_str()))
383 .collect::<Vec<_>>();
384 Ok(
385 flamegraph(stacks, "Heap Flamegraph", true, &stats_rendered, build_info)
386 .into_response(),
387 )
388 }
389 "time_fg" => {
390 let time_secs = time_secs.ok_or_else(|| {
391 (
392 StatusCode::BAD_REQUEST,
393 "Expected value for `time_secs`".to_owned(),
394 )
395 })?;
396 let hz = hz.ok_or_else(|| {
397 (
398 StatusCode::BAD_REQUEST,
399 "Expected value for `hz`".to_owned(),
400 )
401 })?;
402 Ok(time_prof(merge_threads, build_info, time_secs, hz)
403 .await
404 .into_response())
405 }
406 x => Err((
407 StatusCode::BAD_REQUEST,
408 format!("unrecognized `action` parameter: {}", x),
409 )),
410 }
411 }
412
413 #[derive(Deserialize)]
414 pub struct ProfQuery {
415 action: Option<String>,
416 }
417
418 pub async fn handle_get(
419 Query(query): Query<ProfQuery>,
420 headers: HeaderMap,
421 build_info: &'static BuildInfo,
422 ) -> impl IntoResponse {
423 let prof_ctl = PROF_CTL.as_ref().unwrap();
424 match query.action.as_deref() {
425 Some("dump_stats") => {
426 let json = headers
427 .get("accept")
428 .map_or(false, |accept| accept.as_bytes() == b"application/json");
429 let mut borrow = prof_ctl.lock().await;
430 let s = borrow
431 .dump_stats(json)
432 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
433 let content_type = match json {
434 false => ContentType::text(),
435 true => ContentType::json(),
436 };
437 Ok((TypedHeader(content_type), s).into_response())
438 }
439 Some(x) => Err((
440 StatusCode::BAD_REQUEST,
441 format!("unrecognized query: {}", x),
442 )),
443 None => Ok(render_template(prof_ctl, build_info).await.into_response()),
444 }
445 }
446
447 pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
448 let mut prof_ctl = PROF_CTL.as_ref().unwrap().lock().await;
449 require_profiling_activated(&prof_ctl)?;
450 let dump_file = prof_ctl
451 .dump()
452 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
453 let dump_reader = BufReader::new(dump_file);
454 let profile = parse_jeheap(dump_reader, MAPPINGS.as_deref())
455 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
456 let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
457 Ok(pprof)
458 }
459
460 async fn render_template(
461 prof_ctl: &Arc<Mutex<JemallocProfCtl>>,
462 build_info: &'static BuildInfo,
463 ) -> impl IntoResponse {
464 let prof_md = prof_ctl.lock().await.get_md();
465 mz_http_util::template_response(ProfTemplate {
466 version: build_info.version,
467 executable: &super::EXECUTABLE,
468 mem_prof: MemProfilingStatus::Enabled(prof_md.start_time),
469 ever_symbolized: ever_symbolized(),
470 })
471 }
472
473 fn require_profiling_activated(prof_ctl: &JemallocProfCtl) -> Result<(), (StatusCode, String)> {
475 if prof_ctl.activated() {
476 Ok(())
477 } else {
478 Err((StatusCode::FORBIDDEN, "heap profiling not activated".into()))
479 }
480 }
481}