differential_dataflow/trace/cursor/
cursor_list.rs

1//! A generic cursor implementation merging multiple cursors.
2
3use super::Cursor;
4
5/// Provides a cursor interface over a list of cursors.
6///
7/// The `CursorList` tracks the indices of cursors with the minimum key, and the the indices of cursors with
8/// the minimum key and minimum value. It performs no clever management of these sets otherwise.
9#[derive(Debug)]
10pub struct CursorList<C> {
11    cursors: Vec<C>,
12    min_key: Vec<usize>,
13    min_val: Vec<usize>,
14}
15
16impl<C: Cursor> CursorList<C> {
17    /// Creates a new cursor list from pre-existing cursors.
18    pub fn new(cursors: Vec<C>, storage: &[C::Storage]) -> Self  {
19        let mut result = CursorList {
20            cursors,
21            min_key: Vec::new(),
22            min_val: Vec::new(),
23        };
24
25        result.minimize_keys(storage);
26        result
27    }
28
29    /// Initialize min_key with the indices of cursors with the minimum key.
30    ///
31    /// This method scans the current keys of each cursor, and tracks the indices
32    /// of cursors whose key equals the minimum valid key seen so far. As it goes,
33    /// if it observes an improved key it clears the current list, updates the
34    /// minimum key, and continues.
35    ///
36    /// Once finished, it invokes `minimize_vals()` to ensure the value cursor is
37    /// in a consistent state as well.
38    fn minimize_keys(&mut self, storage: &[C::Storage]) {
39
40        self.min_key.clear();
41
42        // We'll visit each non-`None` key, maintaining the indexes of the least keys in `self.min_key`.
43        let mut iter = self.cursors.iter().enumerate().flat_map(|(idx, cur)| cur.get_key(&storage[idx]).map(|key| (idx, key)));
44        if let Some((idx, key)) = iter.next() {
45            let mut min_key = key;
46            self.min_key.push(idx);
47            for (idx, key) in iter {
48                match key.cmp(&min_key) {
49                    std::cmp::Ordering::Less => {
50                        self.min_key.clear();
51                        self.min_key.push(idx);
52                        min_key = key;
53                    }
54                    std::cmp::Ordering::Equal => {
55                        self.min_key.push(idx);
56                    }
57                    std::cmp::Ordering::Greater => { }
58                }
59            }
60        }
61
62        self.minimize_vals(storage);
63    }
64
65    /// Initialize min_val with the indices of minimum key cursors with the minimum value.
66    ///
67    /// This method scans the current values of cursor with minimum keys, and tracks the
68    /// indices of cursors whose value equals the minimum valid value seen so far. As it
69    /// goes, if it observes an improved value it clears the current list, updates the minimum
70    /// value, and continues.
71    fn minimize_vals(&mut self, storage: &[C::Storage]) {
72
73        self.min_val.clear();
74
75        // We'll visit each non-`None` value, maintaining the indexes of the least values in `self.min_val`.
76        let mut iter = self.min_key.iter().cloned().flat_map(|idx| self.cursors[idx].get_val(&storage[idx]).map(|val| (idx, val)));
77        if let Some((idx, val)) = iter.next() {
78            let mut min_val = val;
79            self.min_val.push(idx);
80            for (idx, val) in iter {
81                match val.cmp(&min_val) {
82                    std::cmp::Ordering::Less => {
83                        self.min_val.clear();
84                        self.min_val.push(idx);
85                        min_val = val;
86                    }
87                    std::cmp::Ordering::Equal => {
88                        self.min_val.push(idx);
89                    }
90                    std::cmp::Ordering::Greater => { }
91                }
92            }
93        }
94    }
95}
96
97impl<C: Cursor> Cursor for CursorList<C> {
98    type Key<'a> = C::Key<'a>;
99    type Val<'a> = C::Val<'a>;
100    type Time = C::Time;
101    type TimeGat<'a> = C::TimeGat<'a>;
102    type Diff = C::Diff;
103    type DiffGat<'a> = C::DiffGat<'a>;
104
105    type Storage = Vec<C::Storage>;
106
107    // validation methods
108    #[inline]
109    fn key_valid(&self, _storage: &Vec<C::Storage>) -> bool { !self.min_key.is_empty() }
110    #[inline]
111    fn val_valid(&self, _storage: &Vec<C::Storage>) -> bool { !self.min_val.is_empty() }
112
113    // accessors
114    #[inline]
115    fn key<'a>(&self, storage: &'a Vec<C::Storage>) -> Self::Key<'a> {
116        debug_assert!(self.key_valid(storage));
117        debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]]));
118        self.cursors[self.min_key[0]].key(&storage[self.min_key[0]])
119    }
120    #[inline]
121    fn val<'a>(&self, storage: &'a Vec<C::Storage>) -> Self::Val<'a> {
122        debug_assert!(self.key_valid(storage));
123        debug_assert!(self.val_valid(storage));
124        debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]]));
125        self.cursors[self.min_val[0]].val(&storage[self.min_val[0]])
126    }
127    #[inline]
128    fn get_key<'a>(&self, storage: &'a Vec<C::Storage>) -> Option<Self::Key<'a>> {
129        self.min_key.get(0).map(|idx| self.cursors[*idx].key(&storage[*idx]))
130    }
131    #[inline]
132    fn get_val<'a>(&self, storage: &'a Vec<C::Storage>) -> Option<Self::Val<'a>> {
133        self.min_val.get(0).map(|idx| self.cursors[*idx].val(&storage[*idx]))
134    }
135
136    #[inline]
137    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
138        for &index in self.min_val.iter() {
139            self.cursors[index].map_times(&storage[index], |t,d| logic(t,d));
140        }
141    }
142
143    // key methods
144    #[inline]
145    fn step_key(&mut self, storage: &Vec<C::Storage>) {
146        for &index in self.min_key.iter() {
147            self.cursors[index].step_key(&storage[index]);
148        }
149        self.minimize_keys(storage);
150    }
151    #[inline]
152    fn seek_key(&mut self, storage: &Vec<C::Storage>, key: Self::Key<'_>) {
153        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
154            cursor.seek_key(storage, key);
155        }
156        self.minimize_keys(storage);
157    }
158
159    // value methods
160    #[inline]
161    fn step_val(&mut self, storage: &Vec<C::Storage>) {
162        for &index in self.min_val.iter() {
163            self.cursors[index].step_val(&storage[index]);
164        }
165        self.minimize_vals(storage);
166    }
167    #[inline]
168    fn seek_val(&mut self, storage: &Vec<C::Storage>, val: Self::Val<'_>) {
169        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
170            cursor.seek_val(storage, val);
171        }
172        self.minimize_vals(storage);
173    }
174
175    // rewinding methods
176    #[inline]
177    fn rewind_keys(&mut self, storage: &Vec<C::Storage>) {
178        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
179            cursor.rewind_keys(storage);
180        }
181        self.minimize_keys(storage);
182    }
183    #[inline]
184    fn rewind_vals(&mut self, storage: &Vec<C::Storage>) {
185        for &index in self.min_key.iter() {
186            self.cursors[index].rewind_vals(&storage[index]);
187        }
188        self.minimize_vals(storage);
189    }
190}