mz_ore/
stats.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Statistics utilities.
17
18/// A standard range of buckets for timing data, measured in seconds.
19/// Individual histograms may only need a subset of this range, in which case,
20/// see `histogram_seconds_buckets` below.
21///
22/// Note that any changes to this range may modify buckets for existing metrics.
23const HISTOGRAM_SECOND_BUCKETS: [f64; 31] = [
24    0.000_008, 0.000_016, 0.000_032, 0.000_064, 0.000_128, 0.000_256, 0.000_512, 0.001, 0.002,
25    0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0,
26    128.0, 256.0, 512.0, 1024.0, 2048.0, 4096.0, 8192.0,
27];
28
29/// Returns a `Vec` of time buckets that are both present in our standard
30/// buckets above and within the provided inclusive range. (This makes it
31/// more meaningful to compare latency percentiles across histograms if needed,
32/// without requiring all metrics to use exactly the same buckets.)
33pub fn histogram_seconds_buckets(from: f64, to: f64) -> Vec<f64> {
34    let mut vec = Vec::with_capacity(HISTOGRAM_SECOND_BUCKETS.len());
35    vec.extend(
36        HISTOGRAM_SECOND_BUCKETS
37            .iter()
38            .copied()
39            .filter(|&b| b >= from && b <= to),
40    );
41    vec
42}
43
44/// A standard range of buckets for timing data, measured in seconds.
45/// Individual histograms may only need a subset of this range, in which case,
46/// see `histogram_seconds_buckets` below.
47///
48/// Note that any changes to this range may modify buckets for existing metrics.
49const HISTOGRAM_MILLISECOND_BUCKETS: [f64; 19] = [
50    0.128, 0.256, 0.512, 1., 2., 4., 8., 16., 32., 64., 128., 256., 512., 1000., 2000., 4000.,
51    8000., 16000., 32000.,
52];
53
54/// Returns a `Vec` of time buckets that are both present in our standard
55/// buckets above and within the provided inclusive range. (This makes it
56/// more meaningful to compare latency percentiles across histograms if needed,
57/// without requiring all metrics to use exactly the same buckets.)
58pub fn histogram_milliseconds_buckets(from_ms: f64, to_ms: f64) -> Vec<f64> {
59    let mut vec = Vec::with_capacity(HISTOGRAM_MILLISECOND_BUCKETS.len());
60    vec.extend(
61        HISTOGRAM_MILLISECOND_BUCKETS
62            .iter()
63            .copied()
64            .filter(|&b| b >= from_ms && b <= to_ms),
65    );
66    vec
67}
68
69/// Buckets that capture sizes of 64 bytes up to a gigabyte
70pub const HISTOGRAM_BYTE_BUCKETS: [f64; 7] = [
71    64.0,
72    1024.0,
73    16384.0,
74    262144.0,
75    4194304.0,
76    67108864.0,
77    1073741824.0,
78];
79
80/// Keeps track of the minimum and maximum value over a fixed-size sliding window of samples.
81///
82/// Inspired by the [`moving_min_max`] crate, see that crate's documentation for a description of
83/// the high-level algorithm used here.
84///
85/// There are two major differences to [`moving_min_max`]:
86///  * `SlidingMinMax` tracks both the minimum and the maximum value at the same time.
87///  * `SlidingMinMax` assumes a fixed-size window. Pushing new samples automatically pops old ones
88///    and there is no support for manually popping samples.
89///
90/// The memory required for a `SlidingMinMax` value is `size_of::<T> * 3 * window_size`, plus a
91/// small constant overhead.
92///
93/// [`moving_min_max`]: https://crates.io/crates/moving_min_max
94#[derive(Debug)]
95pub struct SlidingMinMax<T> {
96    /// The push stack and the pop stack, merged into one allocation to optimize memory usage.
97    ///
98    /// The push stack is the first `push_stack_len` items, the pop stack is the rest.
99    /// The push stack grows to the right, the pop stack grows to the left.
100    ///
101    /// +--------------------------------+
102    /// | push stack --> | <-- pop stack |
103    /// +----------------^---------------^
104    ///           push_stack_len
105    ///
106    ///  New samples are pushed to the push stack, together with the current minimum and maximum
107    ///  values. If the pop stack is not empty, each push implicitly pops an element from the pop
108    ///  stack, by increasing `push_stack_len`. Once the push stack has reached the window size
109    ///  (i.e. the capacity of `stacks`), we "flip" the stacks by converting the push stack into a
110    ///  full pop stack with an inverted order of samples and min/max values. After the flip,
111    ///  `push_stack_len` is zero again, and new samples can be pushed to the push stack.
112    stacks: Vec<(T, T, T)>,
113    /// The length of the push stack.
114    ///
115    /// The top of the push stack is `stacks[push_stack_len - 1]`.
116    /// The top of the pop stack is `stacks[push_stack_len]`.
117    push_stack_len: usize,
118}
119
120impl<T> SlidingMinMax<T>
121where
122    T: Clone + PartialOrd,
123{
124    /// Creates a new `SlidingMinMax` for the given window size.
125    pub fn new(window_size: usize) -> Self {
126        Self {
127            stacks: Vec::with_capacity(window_size),
128            push_stack_len: 0,
129        }
130    }
131
132    /// Returns a reference to the item at the top of the push stack.
133    fn top_of_push_stack(&self) -> Option<&(T, T, T)> {
134        self.push_stack_len.checked_sub(1).map(|i| &self.stacks[i])
135    }
136
137    /// Returns a reference to the item at the top of the pop stack.
138    fn top_of_pop_stack(&self) -> Option<&(T, T, T)> {
139        self.stacks.get(self.push_stack_len)
140    }
141
142    /// Adds the given sample.
143    pub fn add_sample(&mut self, sample: T) {
144        if self.push_stack_len == self.stacks.capacity() {
145            self.flip_stacks();
146        }
147
148        let (min, max) = match self.top_of_push_stack() {
149            Some((_, min, max)) => {
150                let min = po_min(min, &sample).clone();
151                let max = po_max(max, &sample).clone();
152                (min, max)
153            }
154            None => (sample.clone(), sample.clone()),
155        };
156
157        if self.stacks.len() <= self.push_stack_len {
158            self.stacks.push((sample, min, max));
159        } else {
160            self.stacks[self.push_stack_len] = (sample, min, max);
161        }
162        self.push_stack_len += 1;
163    }
164
165    /// Drains the push stack into the pop stack.
166    fn flip_stacks(&mut self) {
167        let Some((sample, _, _)) = self.top_of_push_stack().cloned() else {
168            return;
169        };
170
171        self.push_stack_len -= 1;
172        self.stacks[self.push_stack_len] = (sample.clone(), sample.clone(), sample);
173
174        while let Some((sample, _, _)) = self.top_of_push_stack() {
175            let (_, min, max) = self.top_of_pop_stack().expect("pop stack not empty");
176            let sample = sample.clone();
177            let min = po_min(min, &sample).clone();
178            let max = po_max(max, &sample).clone();
179
180            self.push_stack_len -= 1;
181            self.stacks[self.push_stack_len] = (sample, min, max);
182        }
183    }
184
185    /// Returns the current minimum and maximum values.
186    pub fn get(&self) -> Option<(&T, &T)> {
187        match (self.top_of_push_stack(), self.top_of_pop_stack()) {
188            (None, None) => None,
189            (None, Some((_, min, max))) | (Some((_, min, max)), None) => Some((min, max)),
190            (Some((_, min1, max1)), Some((_, min2, max2))) => {
191                Some((po_min(min1, min2), po_max(max1, max2)))
192            }
193        }
194    }
195}
196
197/// Like `std::cmp::min`, but works with `PartialOrd` values.
198///
199/// If `a` and `b` are not comparable, `b` is returned.
200fn po_min<T: PartialOrd>(a: T, b: T) -> T {
201    if a < b { a } else { b }
202}
203
204/// Like `std::cmp::max`, but works with `PartialOrd` values.
205///
206/// If `a` and `b` are not comparable, `b` is returned.
207fn po_max<T: PartialOrd>(a: T, b: T) -> T {
208    if a > b { a } else { b }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[mz_ore::test]
216    fn minmax() {
217        let mut minmax = SlidingMinMax::new(5);
218
219        assert_eq!(minmax.get(), None);
220
221        let mut push_and_check = |x, expected| {
222            minmax.add_sample(x);
223            let actual = minmax.get().map(|(min, max)| (*min, *max));
224            assert_eq!(actual, Some(expected), "{minmax:?}");
225        };
226
227        push_and_check(5, (5, 5));
228        push_and_check(1, (1, 5));
229        push_and_check(10, (1, 10));
230        push_and_check(2, (1, 10));
231        push_and_check(9, (1, 10));
232        push_and_check(3, (1, 10));
233        push_and_check(8, (2, 10));
234        push_and_check(5, (2, 9));
235        push_and_check(5, (3, 9));
236        push_and_check(5, (3, 8));
237        push_and_check(5, (5, 8));
238        push_and_check(5, (5, 5));
239    }
240}