Skip to main content

mz_timely_util/column_pager/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Prometheus metrics for the column pager.
17//!
18//! One process-wide [`PagerMetrics`] singleton, installed by compute init via
19//! [`register`]. Counter observers (`observe_*`) are no-ops until that call
20//! lands; the lazy initialization keeps tests and benches that don't wire a
21//! [`MetricsRegistry`] free of bookkeeping.
22
23use std::sync::OnceLock;
24
25use mz_ore::metric;
26use mz_ore::metrics::{ComputedUIntGauge, IntCounter, MetricsRegistry};
27
28use crate::column_pager::policy::TieredPolicy;
29
30/// Process-wide pager metrics. Counters track cumulative observations since
31/// process start; gauges read the live policy atomics at scrape time.
32#[derive(Debug)]
33pub struct PagerMetrics {
34    /// Number of decisions that kept the chunk resident.
35    pub skip_decisions_total: IntCounter,
36    /// Total bytes kept resident by skip decisions.
37    pub skip_bytes_total: IntCounter,
38    /// Number of decisions that paged the chunk out.
39    pub pageouts_total: IntCounter,
40    /// Uncompressed body bytes handed to the pager for pageout.
41    pub paged_bytes_in_total: IntCounter,
42    /// On-storage payload bytes after codec / padding.
43    pub paged_bytes_out_total: IntCounter,
44    /// Number of page-ins from `ColumnPager::take`.
45    pub pageins_total: IntCounter,
46    /// Total uncompressed bytes delivered by page-in.
47    pub pagein_bytes_total: IntCounter,
48    /// Resident-ticket drops returning bytes to the budget.
49    pub resident_released_total: IntCounter,
50    /// Total bytes returned to the budget by ticket drops.
51    pub resident_released_bytes_total: IntCounter,
52    // Computed gauges are registered with the registry but not held here —
53    // their collectors are owned by the prometheus registry.
54}
55
56static METRICS: OnceLock<PagerMetrics> = OnceLock::new();
57
58/// Install the pager metrics into `registry`. Idempotent — repeated calls
59/// after the first one are no-ops. Computed gauges read the singleton
60/// [`TieredPolicy`] atomics at scrape time; their values reflect the live
61/// policy whether or not the column-paged batcher is currently enabled.
62pub fn register(registry: &MetricsRegistry, policy: &'static TieredPolicy) {
63    let _ = METRICS.get_or_init(|| {
64        // Computed gauges: closures hold the &'static policy reference.
65        let _budget_remaining: ComputedUIntGauge = registry.register_computed_gauge(
66            metric!(
67                name: "mz_column_pager_budget_remaining_bytes",
68                help: "Bytes the column-pager tiered policy currently has \
69                       available for resident columns.",
70            ),
71            move || u64::try_from(policy.budget_remaining()).unwrap_or(u64::MAX),
72        );
73        let _budget_configured: ComputedUIntGauge = registry.register_computed_gauge(
74            metric!(
75                name: "mz_column_pager_budget_configured_bytes",
76                help: "Most-recently-configured total budget for the \
77                       column-pager tiered policy.",
78            ),
79            move || u64::try_from(policy.configured_total()).unwrap_or(u64::MAX),
80        );
81
82        PagerMetrics {
83            skip_decisions_total: registry.register(metric!(
84                name: "mz_column_pager_skip_decisions_total",
85                help: "Pager decisions that kept the chunk resident.",
86            )),
87            skip_bytes_total: registry.register(metric!(
88                name: "mz_column_pager_skip_bytes_total",
89                help: "Total bytes kept resident by skip decisions.",
90            )),
91            pageouts_total: registry.register(metric!(
92                name: "mz_column_pager_pageouts_total",
93                help: "Pager decisions that paged the chunk out.",
94            )),
95            paged_bytes_in_total: registry.register(metric!(
96                name: "mz_column_pager_paged_bytes_in_total",
97                help: "Total uncompressed bytes handed to the pager for \
98                       pageout, before any codec is applied.",
99            )),
100            paged_bytes_out_total: registry.register(metric!(
101                name: "mz_column_pager_paged_bytes_out_total",
102                help: "Total on-storage bytes after codec / padding.",
103            )),
104            pageins_total: registry.register(metric!(
105                name: "mz_column_pager_pageins_total",
106                help: "Successful page-ins from `ColumnPager::take`.",
107            )),
108            pagein_bytes_total: registry.register(metric!(
109                name: "mz_column_pager_pagein_bytes_total",
110                help: "Total uncompressed bytes delivered by page-in.",
111            )),
112            resident_released_total: registry.register(metric!(
113                name: "mz_column_pager_resident_released_total",
114                help: "Resident-ticket drops returning budget.",
115            )),
116            resident_released_bytes_total: registry.register(metric!(
117                name: "mz_column_pager_resident_released_bytes_total",
118                help: "Total bytes returned to the budget by ticket drops.",
119            )),
120        }
121    });
122}
123
124#[inline]
125fn metrics() -> Option<&'static PagerMetrics> {
126    METRICS.get()
127}
128
129pub(crate) fn observe_skip(bytes: usize) {
130    if let Some(m) = metrics() {
131        m.skip_decisions_total.inc();
132        m.skip_bytes_total.inc_by(bytes_to_u64(bytes));
133    }
134}
135
136pub(crate) fn observe_pageout(bytes_in: usize, bytes_out: usize) {
137    if let Some(m) = metrics() {
138        m.pageouts_total.inc();
139        m.paged_bytes_in_total.inc_by(bytes_to_u64(bytes_in));
140        m.paged_bytes_out_total.inc_by(bytes_to_u64(bytes_out));
141    }
142}
143
144pub(crate) fn observe_pagein(bytes: usize) {
145    if let Some(m) = metrics() {
146        m.pageins_total.inc();
147        m.pagein_bytes_total.inc_by(bytes_to_u64(bytes));
148    }
149}
150
151pub(crate) fn observe_resident_released(bytes: usize) {
152    if let Some(m) = metrics() {
153        m.resident_released_total.inc();
154        m.resident_released_bytes_total.inc_by(bytes_to_u64(bytes));
155    }
156}
157
158fn bytes_to_u64(b: usize) -> u64 {
159    u64::try_from(b).unwrap_or(u64::MAX)
160}