mz_clusterd/
usage_metrics.rs1use std::path::PathBuf;
16
17use serde::Serialize;
18use tracing::{debug, error};
19
20pub(crate) struct Collector {
22 pub disk_root: Option<PathBuf>,
23}
24
25impl Collector {
26 pub fn collect(&self) -> Usage {
28 let disk_bytes = self.collect_disk_usage();
29 let (memory_bytes, swap_bytes) = collect_heap_usage();
30 let heap_limit = collect_heap_limit();
31
32 Usage {
33 disk_bytes,
34 memory_bytes,
35 swap_bytes,
36 heap_limit,
37 }
38 }
39
40 fn collect_disk_usage(&self) -> Option<u64> {
41 let Some(root) = &self.disk_root else {
42 return None;
43 };
44
45 let stat = match nix::sys::statvfs::statvfs(root) {
46 Ok(stat) => stat,
47 Err(err) => {
48 error!("statvfs error: {err}");
49 return None;
50 }
51 };
52
53 #[allow(clippy::useless_conversion)]
55 let used_blocks = u64::from(stat.blocks() - stat.blocks_available());
56 let used_bytes = used_blocks * stat.fragment_size();
57
58 debug!("disk usage: {used_bytes}");
59
60 Some(used_bytes)
61 }
62}
63
64#[derive(Serialize)]
66pub(crate) struct Usage {
67 disk_bytes: Option<u64>,
68 memory_bytes: Option<u64>,
69 swap_bytes: Option<u64>,
70 heap_limit: Option<u64>,
71}
72
73#[cfg(target_os = "linux")]
74mod linux {
75 use std::fs;
76 use std::path::Path;
77
78 use anyhow::{anyhow, bail};
79 use mz_compute::memory_limiter;
80 use mz_ore::cast::CastInto;
81 use tracing::{debug, error};
82
83 pub fn collect_heap_usage() -> (Option<u64>, Option<u64>) {
85 use mz_ore::cast::CastInto;
86
87 match memory_limiter::ProcStatus::from_proc() {
88 Ok(status) => {
89 let memory_bytes = status.vm_rss.cast_into();
90 let swap_bytes = status.vm_swap.cast_into();
91
92 debug!("memory usage: {memory_bytes}");
93 debug!("swap usage: {swap_bytes}");
94
95 (Some(memory_bytes), Some(swap_bytes))
96 }
97 Err(err) => {
98 error!("error reading /proc/self/status: {err}");
99 (None, None)
100 }
101 }
102 }
103
104 pub fn collect_heap_limit() -> Option<u64> {
106 let (phys_mem_limit, phys_swap_limit) = get_physical_limits()?;
108
109 let (cgroup_mem_limit, cgroup_swap_limit) = get_cgroup_limits();
111 let mem_limit = cgroup_mem_limit.unwrap_or(u64::MAX).min(phys_mem_limit);
112 let swap_limit = cgroup_swap_limit.unwrap_or(u64::MAX).min(phys_swap_limit);
113
114 let heap_limit = mem_limit + swap_limit;
115
116 let limiter_limit = memory_limiter::get_memory_limit().map(CastInto::cast_into);
118 let heap_limit = limiter_limit.unwrap_or(u64::MAX).min(heap_limit);
119
120 debug!("memory limit: {mem_limit} (phys={phys_mem_limit}, cgroup={cgroup_mem_limit:?})");
121 debug!("swap limit: {swap_limit} (phys={phys_swap_limit}, cgroup={cgroup_swap_limit:?})");
122 debug!("heap limit: {heap_limit} (limiter={limiter_limit:?})");
123
124 Some(heap_limit)
125 }
126
127 struct ProcMemInfo {
129 mem_total: u64,
130 swap_total: u64,
131 }
132
133 impl ProcMemInfo {
134 fn from_proc() -> anyhow::Result<Self> {
135 let contents = fs::read_to_string("/proc/meminfo")?;
136
137 fn parse_kib_line(line: &str) -> anyhow::Result<u64> {
138 if let Some(kib) = line
139 .split_whitespace()
140 .nth(1)
141 .and_then(|x| x.parse::<u64>().ok())
142 {
143 Ok(kib * 1024)
144 } else {
145 bail!("invalid meminfo line: {line}");
146 }
147 }
148
149 let mut memory = None;
150 let mut swap = None;
151 for line in contents.lines() {
152 if line.starts_with("MemTotal:") {
153 memory = Some(parse_kib_line(line)?);
154 } else if line.starts_with("SwapTotal:") {
155 swap = Some(parse_kib_line(line)?);
156 }
157 }
158
159 let mem_total = memory.ok_or_else(|| anyhow!("MemTotal not found"))?;
160 let swap_total = swap.ok_or_else(|| anyhow!("SwapTotal not found"))?;
161
162 Ok(Self {
163 mem_total,
164 swap_total,
165 })
166 }
167 }
168
169 fn get_physical_limits() -> Option<(u64, u64)> {
171 let meminfo = match ProcMemInfo::from_proc() {
172 Ok(meminfo) => meminfo,
173 Err(error) => {
174 error!("reading `/proc/meminfo`: {error}");
175 return None;
176 }
177 };
178
179 Some((meminfo.mem_total, meminfo.swap_total))
180 }
181
182 fn get_cgroup_limits() -> (Option<u64>, Option<u64>) {
190 let Ok(proc_cgroup) = fs::read_to_string("/proc/self/cgroup") else {
191 return (None, None);
192 };
193
194 let mut lines = proc_cgroup.lines();
198 let Some(cgroup_path) = lines.find_map(|l| l.strip_prefix("0::")) else {
199 error!("invalid `/proc/self/cgroup` format: {proc_cgroup}");
200 return (None, None);
201 };
202
203 let cgroup_path = cgroup_path.strip_prefix("/").unwrap_or(cgroup_path);
206
207 let root = Path::new("/sys/fs/cgroup").join(cgroup_path);
208 if !root.exists() {
209 error!("invalid cgroup root: {}", root.display());
210 return (None, None);
211 }
212
213 let memory_file = root.join("memory.max");
214 let swap_file = root.join("memory.swap.max");
215
216 let memory = fs::read_to_string(memory_file)
217 .ok()
218 .and_then(|s| s.trim().parse().ok());
219 let swap = fs::read_to_string(swap_file)
220 .ok()
221 .and_then(|s| s.trim().parse().ok());
222
223 (memory, swap)
224 }
225}
226
227#[cfg(not(target_os = "linux"))]
228mod macos {
229 use mz_compute::memory_limiter;
230 use mz_ore::cast::CastInto;
231
232 pub fn collect_heap_usage() -> (Option<u64>, Option<u64>) {
233 (None, None)
234 }
235
236 pub fn collect_heap_limit() -> Option<u64> {
237 memory_limiter::get_memory_limit().map(CastInto::cast_into)
238 }
239}
240
241#[cfg(target_os = "linux")]
242use linux::*;
243#[cfg(not(target_os = "linux"))]
244use macos::*;