mz_clusterd/
usage_metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for collecting system usage metrics.
11//!
12//! Currently only disk and swap usage is supported.
13//! We may want to add CPU and memory usage in the future.
14
15use std::path::PathBuf;
16
17use serde::Serialize;
18use tracing::{debug, error};
19
20/// A system usage metrics collector.
21pub(crate) struct Collector {
22    pub disk_root: Option<PathBuf>,
23}
24
25impl Collector {
26    /// Collect current system usage metrics.
27    pub fn collect(&self) -> Usage {
28        let disk_bytes = self.collect_disk_usage();
29        let (memory_bytes, swap_bytes) = collect_heap_usage();
30        let heap_limit = collect_heap_limit();
31
32        Usage {
33            disk_bytes,
34            memory_bytes,
35            swap_bytes,
36            heap_limit,
37        }
38    }
39
40    fn collect_disk_usage(&self) -> Option<u64> {
41        let Some(root) = &self.disk_root else {
42            return None;
43        };
44
45        let stat = match nix::sys::statvfs::statvfs(root) {
46            Ok(stat) => stat,
47            Err(err) => {
48                error!("statvfs error: {err}");
49                return None;
50            }
51        };
52
53        // `fsblkcnt_t` is a `u32` on macOS but a `u64` on Linux.
54        #[allow(clippy::useless_conversion)]
55        let used_blocks = u64::from(stat.blocks() - stat.blocks_available());
56        let used_bytes = used_blocks * stat.fragment_size();
57
58        debug!("disk usage: {used_bytes}");
59
60        Some(used_bytes)
61    }
62}
63
64/// A system usage measurement.
65#[derive(Serialize)]
66pub(crate) struct Usage {
67    disk_bytes: Option<u64>,
68    memory_bytes: Option<u64>,
69    swap_bytes: Option<u64>,
70    heap_limit: Option<u64>,
71}
72
73#[cfg(target_os = "linux")]
74mod linux {
75    use std::fs;
76    use std::path::Path;
77
78    use anyhow::{anyhow, bail};
79    use mz_compute::memory_limiter;
80    use mz_ore::cast::CastInto;
81    use tracing::{debug, error};
82
83    /// Collect memory and swap usage.
84    pub fn collect_heap_usage() -> (Option<u64>, Option<u64>) {
85        use mz_ore::cast::CastInto;
86
87        match memory_limiter::ProcStatus::from_proc() {
88            Ok(status) => {
89                let memory_bytes = status.vm_rss.cast_into();
90                let swap_bytes = status.vm_swap.cast_into();
91
92                debug!("memory usage: {memory_bytes}");
93                debug!("swap usage: {swap_bytes}");
94
95                (Some(memory_bytes), Some(swap_bytes))
96            }
97            Err(err) => {
98                error!("error reading /proc/self/status: {err}");
99                (None, None)
100            }
101        }
102    }
103
104    /// Collect the heap limit, i.e. memory + swap limit.
105    pub fn collect_heap_limit() -> Option<u64> {
106        // If we don't know the physical limits, we can't know the heap limit.
107        let (phys_mem_limit, phys_swap_limit) = get_physical_limits()?;
108
109        // Limits might be reduced by the cgroup.
110        let (cgroup_mem_limit, cgroup_swap_limit) = get_cgroup_limits();
111        let mem_limit = cgroup_mem_limit.unwrap_or(u64::MAX).min(phys_mem_limit);
112        let swap_limit = cgroup_swap_limit.unwrap_or(u64::MAX).min(phys_swap_limit);
113
114        let heap_limit = mem_limit + swap_limit;
115
116        // Heap limit might be reduced by the memory limiter.
117        let limiter_limit = memory_limiter::get_memory_limit().map(CastInto::cast_into);
118        let heap_limit = limiter_limit.unwrap_or(u64::MAX).min(heap_limit);
119
120        debug!("memory limit: {mem_limit} (phys={phys_mem_limit}, cgroup={cgroup_mem_limit:?})");
121        debug!("swap limit: {swap_limit} (phys={phys_swap_limit}, cgroup={cgroup_swap_limit:?})");
122        debug!("heap limit: {heap_limit} (limiter={limiter_limit:?})");
123
124        Some(heap_limit)
125    }
126
127    /// Helper for parsing `/proc/meminfo`.
128    struct ProcMemInfo {
129        mem_total: u64,
130        swap_total: u64,
131    }
132
133    impl ProcMemInfo {
134        fn from_proc() -> anyhow::Result<Self> {
135            let contents = fs::read_to_string("/proc/meminfo")?;
136
137            fn parse_kib_line(line: &str) -> anyhow::Result<u64> {
138                if let Some(kib) = line
139                    .split_whitespace()
140                    .nth(1)
141                    .and_then(|x| x.parse::<u64>().ok())
142                {
143                    Ok(kib * 1024)
144                } else {
145                    bail!("invalid meminfo line: {line}");
146                }
147            }
148
149            let mut memory = None;
150            let mut swap = None;
151            for line in contents.lines() {
152                if line.starts_with("MemTotal:") {
153                    memory = Some(parse_kib_line(line)?);
154                } else if line.starts_with("SwapTotal:") {
155                    swap = Some(parse_kib_line(line)?);
156                }
157            }
158
159            let mem_total = memory.ok_or_else(|| anyhow!("MemTotal not found"))?;
160            let swap_total = swap.ok_or_else(|| anyhow!("SwapTotal not found"))?;
161
162            Ok(Self {
163                mem_total,
164                swap_total,
165            })
166        }
167    }
168
169    /// Collect the physical memory and swap limits.
170    fn get_physical_limits() -> Option<(u64, u64)> {
171        let meminfo = match ProcMemInfo::from_proc() {
172            Ok(meminfo) => meminfo,
173            Err(error) => {
174                error!("reading `/proc/meminfo`: {error}");
175                return None;
176            }
177        };
178
179        Some((meminfo.mem_total, meminfo.swap_total))
180    }
181
182    /// Collect the memory and swap limits enforced by the current cgroup.
183    ///
184    /// We make the following simplifying assumptions that hold for a standard Kubernetes
185    /// environment:
186    //  * The current process is a member of a cgroups v2 hierarchy.
187    //  * The cgroups hierarchy is mounted at `/sys/fs/cgroup`.
188    //  * The limits are applied to the current cgroup directly (and not one of its ancestors).
189    fn get_cgroup_limits() -> (Option<u64>, Option<u64>) {
190        let Ok(proc_cgroup) = fs::read_to_string("/proc/self/cgroup") else {
191            return (None, None);
192        };
193
194        // Find the cgroups v2 hierarchy. Entries in `/proc/self/cgroup` have the form
195        // `hierarchy-id:controller-list:cgroup-path`. For cgroups v2, the first field is 0 and the
196        // second field is empty.
197        let mut lines = proc_cgroup.lines();
198        let Some(cgroup_path) = lines.find_map(|l| l.strip_prefix("0::")) else {
199            error!("invalid `/proc/self/cgroup` format: {proc_cgroup}");
200            return (None, None);
201        };
202
203        // The cgroup path is a relative path but may include a leading `/`. Strip that so
204        // `Path::join` works as expected.
205        let cgroup_path = cgroup_path.strip_prefix("/").unwrap_or(cgroup_path);
206
207        let root = Path::new("/sys/fs/cgroup").join(cgroup_path);
208        if !root.exists() {
209            error!("invalid cgroup root: {}", root.display());
210            return (None, None);
211        }
212
213        let memory_file = root.join("memory.max");
214        let swap_file = root.join("memory.swap.max");
215
216        let memory = fs::read_to_string(memory_file)
217            .ok()
218            .and_then(|s| s.trim().parse().ok());
219        let swap = fs::read_to_string(swap_file)
220            .ok()
221            .and_then(|s| s.trim().parse().ok());
222
223        (memory, swap)
224    }
225}
226
227#[cfg(not(target_os = "linux"))]
228mod macos {
229    use mz_compute::memory_limiter;
230    use mz_ore::cast::CastInto;
231
232    pub fn collect_heap_usage() -> (Option<u64>, Option<u64>) {
233        (None, None)
234    }
235
236    pub fn collect_heap_limit() -> Option<u64> {
237        memory_limiter::get_memory_limit().map(CastInto::cast_into)
238    }
239}
240
241#[cfg(target_os = "linux")]
242use linux::*;
243#[cfg(not(target_os = "linux"))]
244use macos::*;