pprof/
collector.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::collections::hash_map::DefaultHasher;
4use std::convert::TryInto;
5use std::fmt::Debug;
6use std::hash::{Hash, Hasher};
7use std::io::{Read, Seek, SeekFrom, Write};
8
9use crate::frames::UnresolvedFrames;
10
11use aligned_vec::AVec;
12use tempfile::NamedTempFile;
13
14pub const BUCKETS: usize = 1 << 12;
15pub const BUCKETS_ASSOCIATIVITY: usize = 4;
16pub const BUFFER_LENGTH: usize = (1 << 18) / std::mem::size_of::<Entry<UnresolvedFrames>>();
17
18#[derive(Debug)]
19pub struct Entry<T> {
20    pub item: T,
21    pub count: isize,
22}
23
24impl<T: Default> Default for Entry<T> {
25    fn default() -> Self {
26        Entry {
27            item: Default::default(),
28            count: 0,
29        }
30    }
31}
32
33#[derive(Debug)]
34pub struct Bucket<T: 'static> {
35    pub length: usize,
36    entries: Box<[Entry<T>; BUCKETS_ASSOCIATIVITY]>,
37}
38
39impl<T: Eq + Default> Default for Bucket<T> {
40    fn default() -> Bucket<T> {
41        let entries = Box::default();
42
43        Self { length: 0, entries }
44    }
45}
46
47impl<T: Eq> Bucket<T> {
48    pub fn add(&mut self, key: T, count: isize) -> Option<Entry<T>> {
49        let mut done = false;
50        self.entries[0..self.length].iter_mut().for_each(|ele| {
51            if ele.item == key {
52                ele.count += count;
53                done = true;
54            }
55        });
56
57        if done {
58            None
59        } else if self.length < BUCKETS_ASSOCIATIVITY {
60            let ele = &mut self.entries[self.length];
61            ele.item = key;
62            ele.count = count;
63
64            self.length += 1;
65            None
66        } else {
67            let mut min_index = 0;
68            let mut min_count = self.entries[0].count;
69            for index in 0..self.length {
70                let count = self.entries[index].count;
71                if count < min_count {
72                    min_index = index;
73                    min_count = count;
74                }
75            }
76
77            let mut new_entry = Entry { item: key, count };
78            std::mem::swap(&mut self.entries[min_index], &mut new_entry);
79            Some(new_entry)
80        }
81    }
82
83    pub fn iter(&self) -> BucketIterator<T> {
84        BucketIterator::<T> {
85            related_bucket: self,
86            index: 0,
87        }
88    }
89}
90
91pub struct BucketIterator<'a, T: 'static> {
92    related_bucket: &'a Bucket<T>,
93    index: usize,
94}
95
96impl<'a, T> Iterator for BucketIterator<'a, T> {
97    type Item = &'a Entry<T>;
98
99    fn next(&mut self) -> Option<Self::Item> {
100        if self.index < self.related_bucket.length {
101            self.index += 1;
102            Some(&self.related_bucket.entries[self.index - 1])
103        } else {
104            None
105        }
106    }
107}
108
109pub struct HashCounter<T: Hash + Eq + 'static> {
110    buckets: Box<[Bucket<T>; BUCKETS]>,
111}
112
113impl<T: Hash + Eq + Default + Debug> Default for HashCounter<T> {
114    fn default() -> Self {
115        let mut v: Vec<Bucket<T>> = Vec::with_capacity(BUCKETS);
116        v.resize_with(BUCKETS, Default::default);
117        let buckets = v.into_boxed_slice().try_into().unwrap();
118
119        Self { buckets }
120    }
121}
122
123impl<T: Hash + Eq> HashCounter<T> {
124    fn hash(key: &T) -> u64 {
125        let mut s = DefaultHasher::new();
126        key.hash(&mut s);
127        s.finish()
128    }
129
130    pub fn add(&mut self, key: T, count: isize) -> Option<Entry<T>> {
131        let hash_value = Self::hash(&key);
132        let bucket = &mut self.buckets[(hash_value % BUCKETS as u64) as usize];
133
134        bucket.add(key, count)
135    }
136
137    pub fn iter(&self) -> impl Iterator<Item = &Entry<T>> {
138        let mut iter: Box<dyn Iterator<Item = &Entry<T>>> =
139            Box::new(self.buckets[0].iter().chain(std::iter::empty()));
140        for bucket in self.buckets[1..].iter() {
141            iter = Box::new(iter.chain(bucket.iter()));
142        }
143
144        iter
145    }
146}
147
148pub struct TempFdArray<T: 'static> {
149    file: NamedTempFile,
150    buffer: Box<[T; BUFFER_LENGTH]>,
151    buffer_index: usize,
152    flush_n: usize,
153}
154
155impl<T: Default + Debug> TempFdArray<T> {
156    fn new() -> std::io::Result<TempFdArray<T>> {
157        let file = NamedTempFile::new()?;
158
159        let mut v: Vec<T> = Vec::with_capacity(BUFFER_LENGTH);
160        v.resize_with(BUFFER_LENGTH, Default::default);
161        let buffer = v.into_boxed_slice().try_into().unwrap();
162
163        Ok(Self {
164            file,
165            buffer,
166            buffer_index: 0,
167            flush_n: 0,
168        })
169    }
170}
171
172impl<T> TempFdArray<T> {
173    fn flush_buffer(&mut self) -> std::io::Result<()> {
174        self.buffer_index = 0;
175        let buf = unsafe {
176            std::slice::from_raw_parts(
177                self.buffer.as_ptr() as *const u8,
178                BUFFER_LENGTH * std::mem::size_of::<T>(),
179            )
180        };
181        self.flush_n += 1;
182        self.file.write_all(buf)?;
183
184        Ok(())
185    }
186
187    fn push(&mut self, entry: T) -> std::io::Result<()> {
188        if self.buffer_index >= BUFFER_LENGTH {
189            self.flush_buffer()?;
190        }
191
192        self.buffer[self.buffer_index] = entry;
193        self.buffer_index += 1;
194
195        Ok(())
196    }
197
198    fn try_iter(&self) -> std::io::Result<impl Iterator<Item = &T>> {
199        let size = BUFFER_LENGTH * self.flush_n * std::mem::size_of::<T>();
200
201        let mut file_vec = AVec::with_capacity(std::mem::align_of::<T>(), size);
202        let mut file = self.file.reopen()?;
203
204        unsafe {
205            // it's safe as the capacity is initialized to `size`, and it'll be filled with `size` bytes
206            file_vec.set_len(size);
207        }
208        file.read_exact(&mut file_vec[0..size])?;
209        file.seek(SeekFrom::End(0))?;
210
211        Ok(TempFdArrayIterator {
212            buffer: &self.buffer[0..self.buffer_index],
213            file_vec,
214            index: 0,
215        })
216    }
217}
218
219pub struct TempFdArrayIterator<'a, T> {
220    pub buffer: &'a [T],
221    pub file_vec: AVec<u8>,
222    pub index: usize,
223}
224
225impl<'a, T> Iterator for TempFdArrayIterator<'a, T> {
226    type Item = &'a T;
227
228    fn next(&mut self) -> Option<Self::Item> {
229        if self.index < self.buffer.len() {
230            self.index += 1;
231            Some(&self.buffer[self.index - 1])
232        } else {
233            let length = self.file_vec.len() / std::mem::size_of::<T>();
234            let ts =
235                unsafe { std::slice::from_raw_parts(self.file_vec.as_ptr() as *const T, length) };
236            if self.index - self.buffer.len() < ts.len() {
237                self.index += 1;
238                Some(&ts[self.index - self.buffer.len() - 1])
239            } else {
240                None
241            }
242        }
243    }
244}
245
246pub struct Collector<T: Hash + Eq + 'static> {
247    map: HashCounter<T>,
248    temp_array: TempFdArray<Entry<T>>,
249}
250
251impl<T: Hash + Eq + Default + Debug + 'static> Collector<T> {
252    pub fn new() -> std::io::Result<Self> {
253        Ok(Self {
254            map: HashCounter::<T>::default(),
255            temp_array: TempFdArray::<Entry<T>>::new()?,
256        })
257    }
258}
259
260impl<T: Hash + Eq + 'static> Collector<T> {
261    pub fn add(&mut self, key: T, count: isize) -> std::io::Result<()> {
262        if let Some(evict) = self.map.add(key, count) {
263            self.temp_array.push(evict)?;
264        }
265
266        Ok(())
267    }
268
269    pub fn try_iter(&self) -> std::io::Result<impl Iterator<Item = &Entry<T>>> {
270        Ok(self.map.iter().chain(self.temp_array.try_iter()?))
271    }
272}
273
274#[cfg(test)]
275mod test_utils {
276    use super::*;
277    use std::collections::BTreeMap;
278
279    pub fn add_map<T: std::cmp::Ord + Copy>(hashmap: &mut BTreeMap<T, isize>, entry: &Entry<T>) {
280        match hashmap.get_mut(&entry.item) {
281            None => {
282                hashmap.insert(entry.item, entry.count);
283            }
284            Some(count) => *count += entry.count,
285        }
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use std::collections::BTreeMap;
293
294    #[test]
295    fn stack_hash_counter() {
296        let mut stack_hash_counter = HashCounter::<usize>::default();
297        stack_hash_counter.add(0, 1);
298        stack_hash_counter.add(1, 1);
299        stack_hash_counter.add(1, 1);
300
301        stack_hash_counter.iter().for_each(|item| {
302            if item.item == 0 {
303                assert_eq!(item.count, 1);
304            } else if item.item == 1 {
305                assert_eq!(item.count, 2);
306            } else {
307                unreachable!();
308            }
309        });
310    }
311
312    #[test]
313    fn evict_test() {
314        let mut stack_hash_counter = HashCounter::<usize>::default();
315        let mut real_map = BTreeMap::new();
316
317        for item in 0..(1 << 10) * 4 {
318            for _ in 0..(item % 4) {
319                match stack_hash_counter.add(item, 1) {
320                    None => {}
321                    Some(evict) => {
322                        test_utils::add_map(&mut real_map, &evict);
323                    }
324                }
325            }
326        }
327
328        stack_hash_counter.iter().for_each(|entry| {
329            test_utils::add_map(&mut real_map, entry);
330        });
331
332        for item in 0..(1 << 10) * 4 {
333            let count = (item % 4) as isize;
334            match real_map.get(&item) {
335                Some(item) => {
336                    assert_eq!(*item, count);
337                }
338                None => {
339                    assert_eq!(count, 0);
340                }
341            }
342        }
343    }
344
345    #[test]
346    fn collector_test() {
347        let mut collector = Collector::new().unwrap();
348        let mut real_map = BTreeMap::new();
349
350        for item in 0..(1 << 12) * 4 {
351            for _ in 0..(item % 4) {
352                collector.add(item, 1).unwrap();
353            }
354        }
355
356        collector.try_iter().unwrap().for_each(|entry| {
357            test_utils::add_map(&mut real_map, entry);
358        });
359
360        for item in 0..(1 << 12) * 4 {
361            let count = (item % 4) as isize;
362            match real_map.get(&item) {
363                Some(value) => {
364                    assert_eq!(count, *value);
365                }
366                None => {
367                    assert_eq!(count, 0);
368                }
369            }
370        }
371    }
372
373    #[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord, Default, Clone, Copy)]
374    struct AlignTest {
375        a: u16,
376        b: u32,
377        c: u64,
378        d: u64,
379    }
380
381    // collector_align_test uses a bigger item to test the alignment of the collector
382    #[test]
383    fn collector_align_test() {
384        let mut collector = Collector::new().unwrap();
385        let mut real_map = BTreeMap::new();
386
387        for item in 0..(1 << 12) * 4 {
388            for _ in 0..(item % 4) {
389                collector
390                    .add(
391                        AlignTest {
392                            a: item as u16,
393                            b: item as u32,
394                            c: item as u64,
395                            d: item as u64,
396                        },
397                        1,
398                    )
399                    .unwrap();
400            }
401        }
402
403        collector.try_iter().unwrap().for_each(|entry| {
404            test_utils::add_map(&mut real_map, entry);
405        });
406
407        for item in 0..(1 << 12) * 4 {
408            let count = (item % 4) as isize;
409            let align_item = AlignTest {
410                a: item as u16,
411                b: item as u32,
412                c: item as u64,
413                d: item as u64,
414            };
415            match real_map.get(&align_item) {
416                Some(value) => {
417                    assert_eq!(count, *value);
418                }
419                None => {
420                    assert_eq!(count, 0);
421                }
422            }
423        }
424    }
425}