mz_ore/stats.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Statistics utilities.
17
18/// A standard range of buckets for timing data, measured in seconds.
19/// Individual histograms may only need a subset of this range, in which case,
20/// see `histogram_seconds_buckets` below.
21///
22/// Note that any changes to this range may modify buckets for existing metrics.
23const HISTOGRAM_SECOND_BUCKETS: [f64; 23] = [
24 0.000_008, 0.000_016, 0.000_032, 0.000_064, 0.000_128, 0.000_256, 0.000_512, 0.001, 0.002,
25 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0,
26];
27
28/// Returns a `Vec` of time buckets that are both present in our standard
29/// buckets above and within the provided inclusive range. (This makes it
30/// more meaningful to compare latency percentiles across histograms if needed,
31/// without requiring all metrics to use exactly the same buckets.)
32pub fn histogram_seconds_buckets(from: f64, to: f64) -> Vec<f64> {
33 let mut vec = Vec::with_capacity(HISTOGRAM_SECOND_BUCKETS.len());
34 vec.extend(
35 HISTOGRAM_SECOND_BUCKETS
36 .iter()
37 .copied()
38 .filter(|&b| b >= from && b <= to),
39 );
40 vec
41}
42
43/// A standard range of buckets for timing data, measured in seconds.
44/// Individual histograms may only need a subset of this range, in which case,
45/// see `histogram_seconds_buckets` below.
46///
47/// Note that any changes to this range may modify buckets for existing metrics.
48const HISTOGRAM_MILLISECOND_BUCKETS: [f64; 19] = [
49 0.128, 0.256, 0.512, 1., 2., 4., 8., 16., 32., 64., 128., 256., 512., 1000., 2000., 4000.,
50 8000., 16000., 32000.,
51];
52
53/// Returns a `Vec` of time buckets that are both present in our standard
54/// buckets above and within the provided inclusive range. (This makes it
55/// more meaningful to compare latency percentiles across histograms if needed,
56/// without requiring all metrics to use exactly the same buckets.)
57pub fn histogram_milliseconds_buckets(from_ms: f64, to_ms: f64) -> Vec<f64> {
58 let mut vec = Vec::with_capacity(HISTOGRAM_MILLISECOND_BUCKETS.len());
59 vec.extend(
60 HISTOGRAM_MILLISECOND_BUCKETS
61 .iter()
62 .copied()
63 .filter(|&b| b >= from_ms && b <= to_ms),
64 );
65 vec
66}
67
68/// Buckets that capture sizes of 64 bytes up to a gigabyte
69pub const HISTOGRAM_BYTE_BUCKETS: [f64; 7] = [
70 64.0,
71 1024.0,
72 16384.0,
73 262144.0,
74 4194304.0,
75 67108864.0,
76 1073741824.0,
77];
78
79/// Keeps track of the minimum and maximum value over a fixed-size sliding window of samples.
80///
81/// Inspired by the [`moving_min_max`] crate, see that crate's documentation for a description of
82/// the high-level algorithm used here.
83///
84/// There are two major differences to [`moving_min_max`]:
85/// * `SlidingMinMax` tracks both the minimum and the maximum value at the same time.
86/// * `SlidingMinMax` assumes a fixed-size window. Pushing new samples automatically pops old ones
87/// and there is no support for manually popping samples.
88///
89/// The memory required for a `SlidingMinMax` value is `size_of::<T> * 3 * window_size`, plus a
90/// small constant overhead.
91///
92/// [`moving_min_max`]: https://crates.io/crates/moving_min_max
93#[derive(Debug)]
94pub struct SlidingMinMax<T> {
95 /// The push stack and the pop stack, merged into one allocation to optimize memory usage.
96 ///
97 /// The push stack is the first `push_stack_len` items, the pop stack is the rest.
98 /// The push stack grows to the right, the pop stack grows to the left.
99 ///
100 /// +--------------------------------+
101 /// | push stack --> | <-- pop stack |
102 /// +----------------^---------------^
103 /// push_stack_len
104 ///
105 /// New samples are pushed to the push stack, together with the current minimum and maximum
106 /// values. If the pop stack is not empty, each push implicitly pops an element from the pop
107 /// stack, by increasing `push_stack_len`. Once the push stack has reached the window size
108 /// (i.e. the capacity of `stacks`), we "flip" the stacks by converting the push stack into a
109 /// full pop stack with an inverted order of samples and min/max values. After the flip,
110 /// `push_stack_len` is zero again, and new samples can be pushed to the push stack.
111 stacks: Vec<(T, T, T)>,
112 /// The length of the push stack.
113 ///
114 /// The top of the push stack is `stacks[push_stack_len - 1]`.
115 /// The top of the pop stack is `stacks[push_stack_len]`.
116 push_stack_len: usize,
117}
118
119impl<T> SlidingMinMax<T>
120where
121 T: Clone + PartialOrd,
122{
123 /// Creates a new `SlidingMinMax` for the given window size.
124 pub fn new(window_size: usize) -> Self {
125 Self {
126 stacks: Vec::with_capacity(window_size),
127 push_stack_len: 0,
128 }
129 }
130
131 /// Returns a reference to the item at the top of the push stack.
132 fn top_of_push_stack(&self) -> Option<&(T, T, T)> {
133 self.push_stack_len.checked_sub(1).map(|i| &self.stacks[i])
134 }
135
136 /// Returns a reference to the item at the top of the pop stack.
137 fn top_of_pop_stack(&self) -> Option<&(T, T, T)> {
138 self.stacks.get(self.push_stack_len)
139 }
140
141 /// Adds the given sample.
142 pub fn add_sample(&mut self, sample: T) {
143 if self.push_stack_len == self.stacks.capacity() {
144 self.flip_stacks();
145 }
146
147 let (min, max) = match self.top_of_push_stack() {
148 Some((_, min, max)) => {
149 let min = po_min(min, &sample).clone();
150 let max = po_max(max, &sample).clone();
151 (min, max)
152 }
153 None => (sample.clone(), sample.clone()),
154 };
155
156 if self.stacks.len() <= self.push_stack_len {
157 self.stacks.push((sample, min, max));
158 } else {
159 self.stacks[self.push_stack_len] = (sample, min, max);
160 }
161 self.push_stack_len += 1;
162 }
163
164 /// Drains the push stack into the pop stack.
165 fn flip_stacks(&mut self) {
166 let Some((sample, _, _)) = self.top_of_push_stack().cloned() else {
167 return;
168 };
169
170 self.push_stack_len -= 1;
171 self.stacks[self.push_stack_len] = (sample.clone(), sample.clone(), sample);
172
173 while let Some((sample, _, _)) = self.top_of_push_stack() {
174 let (_, min, max) = self.top_of_pop_stack().expect("pop stack not empty");
175 let sample = sample.clone();
176 let min = po_min(min, &sample).clone();
177 let max = po_max(max, &sample).clone();
178
179 self.push_stack_len -= 1;
180 self.stacks[self.push_stack_len] = (sample, min, max);
181 }
182 }
183
184 /// Returns the current minimum and maximum values.
185 pub fn get(&self) -> Option<(&T, &T)> {
186 match (self.top_of_push_stack(), self.top_of_pop_stack()) {
187 (None, None) => None,
188 (None, Some((_, min, max))) | (Some((_, min, max)), None) => Some((min, max)),
189 (Some((_, min1, max1)), Some((_, min2, max2))) => {
190 Some((po_min(min1, min2), po_max(max1, max2)))
191 }
192 }
193 }
194}
195
196/// Like `std::cmp::min`, but works with `PartialOrd` values.
197///
198/// If `a` and `b` are not comparable, `b` is returned.
199fn po_min<T: PartialOrd>(a: T, b: T) -> T {
200 if a < b { a } else { b }
201}
202
203/// Like `std::cmp::max`, but works with `PartialOrd` values.
204///
205/// If `a` and `b` are not comparable, `b` is returned.
206fn po_max<T: PartialOrd>(a: T, b: T) -> T {
207 if a > b { a } else { b }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 #[mz_ore::test]
215 fn minmax() {
216 let mut minmax = SlidingMinMax::new(5);
217
218 assert_eq!(minmax.get(), None);
219
220 let mut push_and_check = |x, expected| {
221 minmax.add_sample(x);
222 let actual = minmax.get().map(|(min, max)| (*min, *max));
223 assert_eq!(actual, Some(expected), "{minmax:?}");
224 };
225
226 push_and_check(5, (5, 5));
227 push_and_check(1, (1, 5));
228 push_and_check(10, (1, 10));
229 push_and_check(2, (1, 10));
230 push_and_check(9, (1, 10));
231 push_and_check(3, (1, 10));
232 push_and_check(8, (2, 10));
233 push_and_check(5, (2, 9));
234 push_and_check(5, (3, 9));
235 push_and_check(5, (3, 8));
236 push_and_check(5, (5, 8));
237 push_and_check(5, (5, 5));
238 }
239}