mz_ore/
stats.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Statistics utilities.
17
18/// A standard range of buckets for timing data, measured in seconds.
19/// Individual histograms may only need a subset of this range, in which case,
20/// see `histogram_seconds_buckets` below.
21///
22/// Note that any changes to this range may modify buckets for existing metrics.
23const HISTOGRAM_SECOND_BUCKETS: [f64; 23] = [
24    0.000_008, 0.000_016, 0.000_032, 0.000_064, 0.000_128, 0.000_256, 0.000_512, 0.001, 0.002,
25    0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0,
26];
27
28/// Returns a `Vec` of time buckets that are both present in our standard
29/// buckets above and within the provided inclusive range. (This makes it
30/// more meaningful to compare latency percentiles across histograms if needed,
31/// without requiring all metrics to use exactly the same buckets.)
32pub fn histogram_seconds_buckets(from: f64, to: f64) -> Vec<f64> {
33    let mut vec = Vec::with_capacity(HISTOGRAM_SECOND_BUCKETS.len());
34    vec.extend(
35        HISTOGRAM_SECOND_BUCKETS
36            .iter()
37            .copied()
38            .filter(|&b| b >= from && b <= to),
39    );
40    vec
41}
42
43/// A standard range of buckets for timing data, measured in seconds.
44/// Individual histograms may only need a subset of this range, in which case,
45/// see `histogram_seconds_buckets` below.
46///
47/// Note that any changes to this range may modify buckets for existing metrics.
48const HISTOGRAM_MILLISECOND_BUCKETS: [f64; 19] = [
49    0.128, 0.256, 0.512, 1., 2., 4., 8., 16., 32., 64., 128., 256., 512., 1000., 2000., 4000.,
50    8000., 16000., 32000.,
51];
52
53/// Returns a `Vec` of time buckets that are both present in our standard
54/// buckets above and within the provided inclusive range. (This makes it
55/// more meaningful to compare latency percentiles across histograms if needed,
56/// without requiring all metrics to use exactly the same buckets.)
57pub fn histogram_milliseconds_buckets(from_ms: f64, to_ms: f64) -> Vec<f64> {
58    let mut vec = Vec::with_capacity(HISTOGRAM_MILLISECOND_BUCKETS.len());
59    vec.extend(
60        HISTOGRAM_MILLISECOND_BUCKETS
61            .iter()
62            .copied()
63            .filter(|&b| b >= from_ms && b <= to_ms),
64    );
65    vec
66}
67
68/// Buckets that capture sizes of 64 bytes up to a gigabyte
69pub const HISTOGRAM_BYTE_BUCKETS: [f64; 7] = [
70    64.0,
71    1024.0,
72    16384.0,
73    262144.0,
74    4194304.0,
75    67108864.0,
76    1073741824.0,
77];
78
79/// Keeps track of the minimum and maximum value over a fixed-size sliding window of samples.
80///
81/// Inspired by the [`moving_min_max`] crate, see that crate's documentation for a description of
82/// the high-level algorithm used here.
83///
84/// There are two major differences to [`moving_min_max`]:
85///  * `SlidingMinMax` tracks both the minimum and the maximum value at the same time.
86///  * `SlidingMinMax` assumes a fixed-size window. Pushing new samples automatically pops old ones
87///    and there is no support for manually popping samples.
88///
89/// The memory required for a `SlidingMinMax` value is `size_of::<T> * 3 * window_size`, plus a
90/// small constant overhead.
91///
92/// [`moving_min_max`]: https://crates.io/crates/moving_min_max
93#[derive(Debug)]
94pub struct SlidingMinMax<T> {
95    /// The push stack and the pop stack, merged into one allocation to optimize memory usage.
96    ///
97    /// The push stack is the first `push_stack_len` items, the pop stack is the rest.
98    /// The push stack grows to the right, the pop stack grows to the left.
99    ///
100    /// +--------------------------------+
101    /// | push stack --> | <-- pop stack |
102    /// +----------------^---------------^
103    ///           push_stack_len
104    ///
105    ///  New samples are pushed to the push stack, together with the current minimum and maximum
106    ///  values. If the pop stack is not empty, each push implicitly pops an element from the pop
107    ///  stack, by increasing `push_stack_len`. Once the push stack has reached the window size
108    ///  (i.e. the capacity of `stacks`), we "flip" the stacks by converting the push stack into a
109    ///  full pop stack with an inverted order of samples and min/max values. After the flip,
110    ///  `push_stack_len` is zero again, and new samples can be pushed to the push stack.
111    stacks: Vec<(T, T, T)>,
112    /// The length of the push stack.
113    ///
114    /// The top of the push stack is `stacks[push_stack_len - 1]`.
115    /// The top of the pop stack is `stacks[push_stack_len]`.
116    push_stack_len: usize,
117}
118
119impl<T> SlidingMinMax<T>
120where
121    T: Clone + PartialOrd,
122{
123    /// Creates a new `SlidingMinMax` for the given window size.
124    pub fn new(window_size: usize) -> Self {
125        Self {
126            stacks: Vec::with_capacity(window_size),
127            push_stack_len: 0,
128        }
129    }
130
131    /// Returns a reference to the item at the top of the push stack.
132    fn top_of_push_stack(&self) -> Option<&(T, T, T)> {
133        self.push_stack_len.checked_sub(1).map(|i| &self.stacks[i])
134    }
135
136    /// Returns a reference to the item at the top of the pop stack.
137    fn top_of_pop_stack(&self) -> Option<&(T, T, T)> {
138        self.stacks.get(self.push_stack_len)
139    }
140
141    /// Adds the given sample.
142    pub fn add_sample(&mut self, sample: T) {
143        if self.push_stack_len == self.stacks.capacity() {
144            self.flip_stacks();
145        }
146
147        let (min, max) = match self.top_of_push_stack() {
148            Some((_, min, max)) => {
149                let min = po_min(min, &sample).clone();
150                let max = po_max(max, &sample).clone();
151                (min, max)
152            }
153            None => (sample.clone(), sample.clone()),
154        };
155
156        if self.stacks.len() <= self.push_stack_len {
157            self.stacks.push((sample, min, max));
158        } else {
159            self.stacks[self.push_stack_len] = (sample, min, max);
160        }
161        self.push_stack_len += 1;
162    }
163
164    /// Drains the push stack into the pop stack.
165    fn flip_stacks(&mut self) {
166        let Some((sample, _, _)) = self.top_of_push_stack().cloned() else {
167            return;
168        };
169
170        self.push_stack_len -= 1;
171        self.stacks[self.push_stack_len] = (sample.clone(), sample.clone(), sample);
172
173        while let Some((sample, _, _)) = self.top_of_push_stack() {
174            let (_, min, max) = self.top_of_pop_stack().expect("pop stack not empty");
175            let sample = sample.clone();
176            let min = po_min(min, &sample).clone();
177            let max = po_max(max, &sample).clone();
178
179            self.push_stack_len -= 1;
180            self.stacks[self.push_stack_len] = (sample, min, max);
181        }
182    }
183
184    /// Returns the current minimum and maximum values.
185    pub fn get(&self) -> Option<(&T, &T)> {
186        match (self.top_of_push_stack(), self.top_of_pop_stack()) {
187            (None, None) => None,
188            (None, Some((_, min, max))) | (Some((_, min, max)), None) => Some((min, max)),
189            (Some((_, min1, max1)), Some((_, min2, max2))) => {
190                Some((po_min(min1, min2), po_max(max1, max2)))
191            }
192        }
193    }
194}
195
196/// Like `std::cmp::min`, but works with `PartialOrd` values.
197///
198/// If `a` and `b` are not comparable, `b` is returned.
199fn po_min<T: PartialOrd>(a: T, b: T) -> T {
200    if a < b { a } else { b }
201}
202
203/// Like `std::cmp::max`, but works with `PartialOrd` values.
204///
205/// If `a` and `b` are not comparable, `b` is returned.
206fn po_max<T: PartialOrd>(a: T, b: T) -> T {
207    if a > b { a } else { b }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    #[mz_ore::test]
215    fn minmax() {
216        let mut minmax = SlidingMinMax::new(5);
217
218        assert_eq!(minmax.get(), None);
219
220        let mut push_and_check = |x, expected| {
221            minmax.add_sample(x);
222            let actual = minmax.get().map(|(min, max)| (*min, *max));
223            assert_eq!(actual, Some(expected), "{minmax:?}");
224        };
225
226        push_and_check(5, (5, 5));
227        push_and_check(1, (1, 5));
228        push_and_check(10, (1, 10));
229        push_and_check(2, (1, 10));
230        push_and_check(9, (1, 10));
231        push_and_check(3, (1, 10));
232        push_and_check(8, (2, 10));
233        push_and_check(5, (2, 9));
234        push_and_check(5, (3, 9));
235        push_and_check(5, (3, 8));
236        push_and_check(5, (5, 8));
237        push_and_check(5, (5, 5));
238    }
239}