mz_ore/stats.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Statistics utilities.
17
18/// A standard range of buckets for timing data, measured in seconds.
19/// Individual histograms may only need a subset of this range, in which case,
20/// see `histogram_seconds_buckets` below.
21///
22/// Note that any changes to this range may modify buckets for existing metrics.
23const HISTOGRAM_SECOND_BUCKETS: [f64; 31] = [
24 0.000_008, 0.000_016, 0.000_032, 0.000_064, 0.000_128, 0.000_256, 0.000_512, 0.001, 0.002,
25 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0,
26 128.0, 256.0, 512.0, 1024.0, 2048.0, 4096.0, 8192.0,
27];
28
29/// Returns a `Vec` of time buckets that are both present in our standard
30/// buckets above and within the provided inclusive range. (This makes it
31/// more meaningful to compare latency percentiles across histograms if needed,
32/// without requiring all metrics to use exactly the same buckets.)
33pub fn histogram_seconds_buckets(from: f64, to: f64) -> Vec<f64> {
34 let mut vec = Vec::with_capacity(HISTOGRAM_SECOND_BUCKETS.len());
35 vec.extend(
36 HISTOGRAM_SECOND_BUCKETS
37 .iter()
38 .copied()
39 .filter(|&b| b >= from && b <= to),
40 );
41 vec
42}
43
44/// A standard range of buckets for timing data, measured in seconds.
45/// Individual histograms may only need a subset of this range, in which case,
46/// see `histogram_seconds_buckets` below.
47///
48/// Note that any changes to this range may modify buckets for existing metrics.
49const HISTOGRAM_MILLISECOND_BUCKETS: [f64; 19] = [
50 0.128, 0.256, 0.512, 1., 2., 4., 8., 16., 32., 64., 128., 256., 512., 1000., 2000., 4000.,
51 8000., 16000., 32000.,
52];
53
54/// Returns a `Vec` of time buckets that are both present in our standard
55/// buckets above and within the provided inclusive range. (This makes it
56/// more meaningful to compare latency percentiles across histograms if needed,
57/// without requiring all metrics to use exactly the same buckets.)
58pub fn histogram_milliseconds_buckets(from_ms: f64, to_ms: f64) -> Vec<f64> {
59 let mut vec = Vec::with_capacity(HISTOGRAM_MILLISECOND_BUCKETS.len());
60 vec.extend(
61 HISTOGRAM_MILLISECOND_BUCKETS
62 .iter()
63 .copied()
64 .filter(|&b| b >= from_ms && b <= to_ms),
65 );
66 vec
67}
68
69/// Buckets that capture sizes of 64 bytes up to a gigabyte
70pub const HISTOGRAM_BYTE_BUCKETS: [f64; 7] = [
71 64.0,
72 1024.0,
73 16384.0,
74 262144.0,
75 4194304.0,
76 67108864.0,
77 1073741824.0,
78];
79
80/// Keeps track of the minimum and maximum value over a fixed-size sliding window of samples.
81///
82/// Inspired by the [`moving_min_max`] crate, see that crate's documentation for a description of
83/// the high-level algorithm used here.
84///
85/// There are two major differences to [`moving_min_max`]:
86/// * `SlidingMinMax` tracks both the minimum and the maximum value at the same time.
87/// * `SlidingMinMax` assumes a fixed-size window. Pushing new samples automatically pops old ones
88/// and there is no support for manually popping samples.
89///
90/// The memory required for a `SlidingMinMax` value is `size_of::<T> * 3 * window_size`, plus a
91/// small constant overhead.
92///
93/// [`moving_min_max`]: https://crates.io/crates/moving_min_max
94#[derive(Debug)]
95pub struct SlidingMinMax<T> {
96 /// The push stack and the pop stack, merged into one allocation to optimize memory usage.
97 ///
98 /// The push stack is the first `push_stack_len` items, the pop stack is the rest.
99 /// The push stack grows to the right, the pop stack grows to the left.
100 ///
101 /// +--------------------------------+
102 /// | push stack --> | <-- pop stack |
103 /// +----------------^---------------^
104 /// push_stack_len
105 ///
106 /// New samples are pushed to the push stack, together with the current minimum and maximum
107 /// values. If the pop stack is not empty, each push implicitly pops an element from the pop
108 /// stack, by increasing `push_stack_len`. Once the push stack has reached the window size
109 /// (i.e. the capacity of `stacks`), we "flip" the stacks by converting the push stack into a
110 /// full pop stack with an inverted order of samples and min/max values. After the flip,
111 /// `push_stack_len` is zero again, and new samples can be pushed to the push stack.
112 stacks: Vec<(T, T, T)>,
113 /// The length of the push stack.
114 ///
115 /// The top of the push stack is `stacks[push_stack_len - 1]`.
116 /// The top of the pop stack is `stacks[push_stack_len]`.
117 push_stack_len: usize,
118}
119
120impl<T> SlidingMinMax<T>
121where
122 T: Clone + PartialOrd,
123{
124 /// Creates a new `SlidingMinMax` for the given window size.
125 pub fn new(window_size: usize) -> Self {
126 Self {
127 stacks: Vec::with_capacity(window_size),
128 push_stack_len: 0,
129 }
130 }
131
132 /// Returns a reference to the item at the top of the push stack.
133 fn top_of_push_stack(&self) -> Option<&(T, T, T)> {
134 self.push_stack_len.checked_sub(1).map(|i| &self.stacks[i])
135 }
136
137 /// Returns a reference to the item at the top of the pop stack.
138 fn top_of_pop_stack(&self) -> Option<&(T, T, T)> {
139 self.stacks.get(self.push_stack_len)
140 }
141
142 /// Adds the given sample.
143 pub fn add_sample(&mut self, sample: T) {
144 if self.push_stack_len == self.stacks.capacity() {
145 self.flip_stacks();
146 }
147
148 let (min, max) = match self.top_of_push_stack() {
149 Some((_, min, max)) => {
150 let min = po_min(min, &sample).clone();
151 let max = po_max(max, &sample).clone();
152 (min, max)
153 }
154 None => (sample.clone(), sample.clone()),
155 };
156
157 if self.stacks.len() <= self.push_stack_len {
158 self.stacks.push((sample, min, max));
159 } else {
160 self.stacks[self.push_stack_len] = (sample, min, max);
161 }
162 self.push_stack_len += 1;
163 }
164
165 /// Drains the push stack into the pop stack.
166 fn flip_stacks(&mut self) {
167 let Some((sample, _, _)) = self.top_of_push_stack().cloned() else {
168 return;
169 };
170
171 self.push_stack_len -= 1;
172 self.stacks[self.push_stack_len] = (sample.clone(), sample.clone(), sample);
173
174 while let Some((sample, _, _)) = self.top_of_push_stack() {
175 let (_, min, max) = self.top_of_pop_stack().expect("pop stack not empty");
176 let sample = sample.clone();
177 let min = po_min(min, &sample).clone();
178 let max = po_max(max, &sample).clone();
179
180 self.push_stack_len -= 1;
181 self.stacks[self.push_stack_len] = (sample, min, max);
182 }
183 }
184
185 /// Returns the current minimum and maximum values.
186 pub fn get(&self) -> Option<(&T, &T)> {
187 match (self.top_of_push_stack(), self.top_of_pop_stack()) {
188 (None, None) => None,
189 (None, Some((_, min, max))) | (Some((_, min, max)), None) => Some((min, max)),
190 (Some((_, min1, max1)), Some((_, min2, max2))) => {
191 Some((po_min(min1, min2), po_max(max1, max2)))
192 }
193 }
194 }
195}
196
197/// Like `std::cmp::min`, but works with `PartialOrd` values.
198///
199/// If `a` and `b` are not comparable, `b` is returned.
200fn po_min<T: PartialOrd>(a: T, b: T) -> T {
201 if a < b { a } else { b }
202}
203
204/// Like `std::cmp::max`, but works with `PartialOrd` values.
205///
206/// If `a` and `b` are not comparable, `b` is returned.
207fn po_max<T: PartialOrd>(a: T, b: T) -> T {
208 if a > b { a } else { b }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[mz_ore::test]
216 fn minmax() {
217 let mut minmax = SlidingMinMax::new(5);
218
219 assert_eq!(minmax.get(), None);
220
221 let mut push_and_check = |x, expected| {
222 minmax.add_sample(x);
223 let actual = minmax.get().map(|(min, max)| (*min, *max));
224 assert_eq!(actual, Some(expected), "{minmax:?}");
225 };
226
227 push_and_check(5, (5, 5));
228 push_and_check(1, (1, 5));
229 push_and_check(10, (1, 10));
230 push_and_check(2, (1, 10));
231 push_and_check(9, (1, 10));
232 push_and_check(3, (1, 10));
233 push_and_check(8, (2, 10));
234 push_and_check(5, (2, 9));
235 push_and_check(5, (3, 9));
236 push_and_check(5, (3, 8));
237 push_and_check(5, (5, 8));
238 push_and_check(5, (5, 5));
239 }
240}