1use std::env;
13use std::sync::LazyLock;
14use std::time::Duration;
15
16use askama::Template;
17use axum::response::IntoResponse;
18use axum::routing::{self, Router};
19use cfg_if::cfg_if;
20use http::StatusCode;
21use mz_build_info::BuildInfo;
22use mz_prof::StackProfileExt;
23use pprof_util::{ProfStartTime, StackProfile};
24
25cfg_if! {
26 if #[cfg(any(not(feature = "jemalloc"), miri))] {
27 use disabled::{handle_get, handle_post, handle_get_heap};
28 } else {
29 use enabled::{handle_get, handle_post, handle_get_heap};
30 }
31}
32
33static EXECUTABLE: LazyLock<String> = LazyLock::new(|| {
34 {
35 env::current_exe()
36 .ok()
37 .as_ref()
38 .and_then(|exe| exe.file_name())
39 .map(|exe| exe.to_string_lossy().into_owned())
40 .unwrap_or_else(|| "<unknown executable>".into())
41 }
42});
43
44mz_http_util::make_handle_static!(
45 dir_1: ::include_dir::include_dir!("$CARGO_MANIFEST_DIR/src/http/static"),
46 dir_2: ::include_dir::include_dir!("$OUT_DIR/src/http/static"),
47 prod_base_path: "src/http/static",
48 dev_base_path: "src/http/static-dev",
49);
50
51pub fn router(build_info: &'static BuildInfo) -> Router {
53 Router::new()
54 .route(
55 "/",
56 routing::get(move |query, headers| handle_get(query, headers, build_info)),
57 )
58 .route(
59 "/",
60 routing::post(move |form| handle_post(form, build_info)),
61 )
62 .route("/heap", routing::get(handle_get_heap))
63 .route("/static/*path", routing::get(handle_static))
64}
65
66#[allow(dead_code)]
67enum MemProfilingStatus {
68 Disabled,
69 Enabled(Option<ProfStartTime>),
70}
71
72#[derive(Template)]
73#[template(path = "prof.html")]
74struct ProfTemplate<'a> {
75 version: &'a str,
76 executable: &'a str,
77 mem_prof: MemProfilingStatus,
78 ever_symbolized: bool,
79}
80
81#[derive(Template)]
82#[template(path = "flamegraph.html")]
83pub struct FlamegraphTemplate<'a> {
84 pub version: &'a str,
85 pub title: &'a str,
86 pub mzfg: &'a str,
87}
88
89#[allow(dropping_copy_types)]
90async fn time_prof(
91 merge_threads: bool,
92 build_info: &'static BuildInfo,
93 time_secs: u64,
95 sample_freq: u32,
97) -> impl IntoResponse + use<> {
98 let ctl_lock;
99 cfg_if! {
100 if #[cfg(any(not(feature = "jemalloc"), miri))] {
101 ctl_lock = ();
102 } else {
103 ctl_lock = if let Some(ctl) = jemalloc_pprof::PROF_CTL.as_ref() {
104 let mut borrow = ctl.lock().await;
105 borrow.deactivate().map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
106 Some(borrow)
107 } else {
108 None
109 };
110 }
111 }
112 let stacks = unsafe {
115 mz_prof::time::prof_time(Duration::from_secs(time_secs), sample_freq, merge_threads)
116 }
117 .await
118 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
119 drop(ctl_lock);
121 let (secs_s, freq_s) = (format!("{time_secs}"), format!("{sample_freq}"));
122 Ok::<_, (StatusCode, String)>(flamegraph(
123 stacks,
124 "CPU Time Flamegraph",
125 false,
126 &[
127 ("Sampling time (s)", &secs_s),
128 ("Sampling frequency (Hz)", &freq_s),
129 ],
130 build_info,
131 ))
132}
133
134fn flamegraph<'a, 'b>(
135 stacks: StackProfile,
136 title: &'a str,
137 display_bytes: bool,
138 extras: &'b [(&'b str, &'b str)],
139 build_info: &'static BuildInfo,
140) -> impl IntoResponse + use<'a> {
141 let mut header_extra = vec![];
142 if display_bytes {
143 header_extra.push(("display_bytes", "1"));
144 }
145 for (k, v) in extras {
146 header_extra.push((k, v));
147 }
148 let mzfg = stacks.to_mzfg(true, &header_extra);
149 mz_http_util::template_response(FlamegraphTemplate {
150 version: build_info.version,
151 title,
152 mzfg: &mzfg,
153 })
154}
155
156#[cfg(any(not(feature = "jemalloc"), miri))]
157mod disabled {
158 use axum::extract::{Form, Query};
159 use axum::response::IntoResponse;
160 use http::StatusCode;
161 use http::header::HeaderMap;
162 use mz_build_info::BuildInfo;
163 use serde::Deserialize;
164
165 use mz_prof::ever_symbolized;
166
167 use super::{MemProfilingStatus, ProfTemplate, time_prof};
168
169 #[derive(Deserialize)]
170 pub struct ProfQuery {
171 _action: Option<String>,
172 }
173
174 #[allow(clippy::unused_async)]
175 pub async fn handle_get(
176 _: Query<ProfQuery>,
177 _: HeaderMap,
178 build_info: &'static BuildInfo,
179 ) -> impl IntoResponse {
180 mz_http_util::template_response(ProfTemplate {
181 version: build_info.version,
182 executable: &super::EXECUTABLE,
183 mem_prof: MemProfilingStatus::Disabled,
184 ever_symbolized: ever_symbolized(),
185 })
186 }
187
188 #[derive(Deserialize)]
189 pub struct ProfForm {
190 action: String,
191 threads: Option<String>,
192 time_secs: Option<u64>,
193 hz: Option<u32>,
194 }
195
196 pub async fn handle_post(
197 Form(ProfForm {
198 action,
199 threads,
200 time_secs,
201 hz,
202 }): Form<ProfForm>,
203 build_info: &'static BuildInfo,
204 ) -> impl IntoResponse {
205 let merge_threads = threads.as_deref() == Some("merge");
206 match action.as_ref() {
207 "time_fg" => {
208 let time_secs = time_secs.ok_or_else(|| {
209 (
210 StatusCode::BAD_REQUEST,
211 "Expected value for `time_secs`".to_owned(),
212 )
213 })?;
214 let hz = hz.ok_or_else(|| {
215 (
216 StatusCode::BAD_REQUEST,
217 "Expected value for `hz`".to_owned(),
218 )
219 })?;
220
221 Ok(time_prof(merge_threads, build_info, time_secs, hz).await)
222 }
223 _ => Err((
224 StatusCode::BAD_REQUEST,
225 format!("unrecognized `action` parameter: {}", action),
226 )),
227 }
228 }
229
230 #[allow(clippy::unused_async)]
231 pub async fn handle_get_heap() -> Result<(), (StatusCode, String)> {
232 Err((
233 StatusCode::BAD_REQUEST,
234 "This software was compiled without heap profiling support.".to_string(),
235 ))
236 }
237}
238
239#[cfg(all(feature = "jemalloc", not(miri)))]
240mod enabled {
241 use std::io::{BufReader, Read};
242 use std::sync::Arc;
243
244 use axum::extract::{Form, Query};
245 use axum::response::IntoResponse;
246 use axum_extra::TypedHeader;
247 use bytesize::ByteSize;
248 use headers::ContentType;
249 use http::header::{CONTENT_DISPOSITION, HeaderMap};
250 use http::{HeaderValue, StatusCode};
251 use jemalloc_pprof::{JemallocProfCtl, PROF_CTL};
252 use mappings::MAPPINGS;
253 use mz_build_info::BuildInfo;
254 use mz_ore::cast::CastFrom;
255 use mz_prof::jemalloc::{JemallocProfCtlExt, JemallocStats};
256 use mz_prof::{StackProfileExt, ever_symbolized};
257 use pprof_util::parse_jeheap;
258 use serde::Deserialize;
259 use tokio::sync::Mutex;
260
261 use super::{MemProfilingStatus, ProfTemplate, flamegraph, time_prof};
262
263 #[derive(Deserialize)]
264 pub struct ProfForm {
265 action: String,
266 threads: Option<String>,
267 time_secs: Option<u64>,
268 hz: Option<u32>,
269 }
270
271 pub async fn handle_post(
272 Form(ProfForm {
273 action,
274 threads,
275 time_secs,
276 hz,
277 }): Form<ProfForm>,
278 build_info: &'static BuildInfo,
279 ) -> impl IntoResponse {
280 let prof_ctl = PROF_CTL.as_ref().unwrap();
281 let merge_threads = threads.as_deref() == Some("merge");
282
283 fn render_jemalloc_stats(stats: &JemallocStats) -> Vec<(&str, String)> {
284 let stats = [
285 ("Allocated", stats.allocated),
286 ("In active pages", stats.active),
287 ("Allocated for allocator metadata", stats.metadata),
288 (
289 "Maximum number of bytes in physically resident data pages mapped by the allocator",
290 stats.resident,
291 ),
292 ("Bytes unused, but retained by allocator", stats.retained),
293 ];
294 stats
295 .into_iter()
296 .map(|(k, v)| (k, ByteSize(u64::cast_from(v)).display().si().to_string()))
297 .collect()
298 }
299
300 match action.as_str() {
301 "activate" => {
302 {
303 let mut borrow = prof_ctl.lock().await;
304 borrow
305 .activate()
306 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
307 };
308 Ok(render_template(prof_ctl, build_info).await.into_response())
309 }
310 "deactivate" => {
311 {
312 let mut borrow = prof_ctl.lock().await;
313 borrow
314 .deactivate()
315 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
316 };
317 Ok(render_template(prof_ctl, build_info).await.into_response())
318 }
319 "dump_jeheap" => {
320 let mut borrow = prof_ctl.lock().await;
321 require_profiling_activated(&borrow)?;
322 let mut f = borrow
323 .dump()
324 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
325 let mut s = String::new();
326 f.read_to_string(&mut s)
327 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
328 Ok((
329 HeaderMap::from_iter([(
330 CONTENT_DISPOSITION,
331 HeaderValue::from_static("attachment; filename=\"jeprof.heap\""),
332 )]),
333 s,
334 )
335 .into_response())
336 }
337 "dump_sym_mzfg" => {
338 let mut borrow = prof_ctl.lock().await;
339 require_profiling_activated(&borrow)?;
340 let f = borrow
341 .dump()
342 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
343 let r = BufReader::new(f);
344 let stacks = parse_jeheap(r, MAPPINGS.as_deref())
345 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
346 let stats = borrow
347 .stats()
348 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
349 let stats_rendered = render_jemalloc_stats(&stats);
350 let mut header = stats_rendered
351 .iter()
352 .map(|(k, v)| (*k, v.as_str()))
353 .collect::<Vec<_>>();
354 header.push(("display_bytes", "1"));
355 let mzfg = stacks.to_mzfg(true, &header);
356 Ok((
357 HeaderMap::from_iter([(
358 CONTENT_DISPOSITION,
359 HeaderValue::from_static("attachment; filename=\"trace.mzfg\""),
360 )]),
361 mzfg,
362 )
363 .into_response())
364 }
365 "mem_fg" => {
366 let mut borrow = prof_ctl.lock().await;
367 require_profiling_activated(&borrow)?;
368 let f = borrow
369 .dump()
370 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
371 let r = BufReader::new(f);
372 let stacks = parse_jeheap(r, MAPPINGS.as_deref())
373 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
374 let stats = borrow
375 .stats()
376 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
377 let stats_rendered = render_jemalloc_stats(&stats);
378 let stats_rendered = stats_rendered
379 .iter()
380 .map(|(k, v)| (*k, v.as_str()))
381 .collect::<Vec<_>>();
382 Ok(
383 flamegraph(stacks, "Heap Flamegraph", true, &stats_rendered, build_info)
384 .into_response(),
385 )
386 }
387 "time_fg" => {
388 let time_secs = time_secs.ok_or_else(|| {
389 (
390 StatusCode::BAD_REQUEST,
391 "Expected value for `time_secs`".to_owned(),
392 )
393 })?;
394 let hz = hz.ok_or_else(|| {
395 (
396 StatusCode::BAD_REQUEST,
397 "Expected value for `hz`".to_owned(),
398 )
399 })?;
400 Ok(time_prof(merge_threads, build_info, time_secs, hz)
401 .await
402 .into_response())
403 }
404 x => Err((
405 StatusCode::BAD_REQUEST,
406 format!("unrecognized `action` parameter: {}", x),
407 )),
408 }
409 }
410
411 #[derive(Deserialize)]
412 pub struct ProfQuery {
413 action: Option<String>,
414 }
415
416 pub async fn handle_get(
417 Query(query): Query<ProfQuery>,
418 headers: HeaderMap,
419 build_info: &'static BuildInfo,
420 ) -> impl IntoResponse {
421 let prof_ctl = PROF_CTL.as_ref().unwrap();
422 match query.action.as_deref() {
423 Some("dump_stats") => {
424 let json = headers
425 .get("accept")
426 .map_or(false, |accept| accept.as_bytes() == b"application/json");
427 let mut borrow = prof_ctl.lock().await;
428 let s = borrow
429 .dump_stats(json)
430 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
431 let content_type = match json {
432 false => ContentType::text(),
433 true => ContentType::json(),
434 };
435 Ok((TypedHeader(content_type), s).into_response())
436 }
437 Some(x) => Err((
438 StatusCode::BAD_REQUEST,
439 format!("unrecognized query: {}", x),
440 )),
441 None => Ok(render_template(prof_ctl, build_info).await.into_response()),
442 }
443 }
444
445 pub async fn handle_get_heap() -> Result<impl IntoResponse, (StatusCode, String)> {
446 let mut prof_ctl = PROF_CTL.as_ref().unwrap().lock().await;
447 require_profiling_activated(&prof_ctl)?;
448 let dump_file = prof_ctl
449 .dump()
450 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
451 let dump_reader = BufReader::new(dump_file);
452 let profile = parse_jeheap(dump_reader, MAPPINGS.as_deref())
453 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
454 let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
455 Ok(pprof)
456 }
457
458 async fn render_template(
459 prof_ctl: &Arc<Mutex<JemallocProfCtl>>,
460 build_info: &'static BuildInfo,
461 ) -> impl IntoResponse {
462 let prof_md = prof_ctl.lock().await.get_md();
463 mz_http_util::template_response(ProfTemplate {
464 version: build_info.version,
465 executable: &super::EXECUTABLE,
466 mem_prof: MemProfilingStatus::Enabled(prof_md.start_time),
467 ever_symbolized: ever_symbolized(),
468 })
469 }
470
471 fn require_profiling_activated(prof_ctl: &JemallocProfCtl) -> Result<(), (StatusCode, String)> {
473 if prof_ctl.activated() {
474 Ok(())
475 } else {
476 Err((StatusCode::FORBIDDEN, "heap profiling not activated".into()))
477 }
478 }
479}