differential_dataflow/trace/cursor/
cursor_list.rs

1//! A generic cursor implementation merging multiple cursors.
2
3use super::Cursor;
4
5/// Provides a cursor interface over a list of cursors.
6///
7/// The `CursorList` tracks the indices of cursors with the minimum key, and the the indices of cursors with
8/// the minimum key and minimum value. It performs no clever management of these sets otherwise.
9#[derive(Debug)]
10pub struct CursorList<C> {
11    cursors: Vec<C>,
12    min_key: Vec<usize>,
13    min_val: Vec<usize>,
14}
15
16impl<C: Cursor> CursorList<C> {
17    /// Creates a new cursor list from pre-existing cursors.
18    pub fn new(cursors: Vec<C>, storage: &[C::Storage]) -> Self  {
19        let mut result = CursorList {
20            cursors,
21            min_key: Vec::new(),
22            min_val: Vec::new(),
23        };
24
25        result.minimize_keys(storage);
26        result
27    }
28
29    /// Initialize min_key with the indices of cursors with the minimum key.
30    ///
31    /// This method scans the current keys of each cursor, and tracks the indices
32    /// of cursors whose key equals the minimum valid key seen so far. As it goes,
33    /// if it observes an improved key it clears the current list, updates the
34    /// minimum key, and continues.
35    ///
36    /// Once finished, it invokes `minimize_vals()` to ensure the value cursor is
37    /// in a consistent state as well.
38    fn minimize_keys(&mut self, storage: &[C::Storage]) {
39
40        self.min_key.clear();
41
42        // We'll visit each non-`None` key, maintaining the indexes of the least keys in `self.min_key`.
43        let mut iter = self.cursors.iter().enumerate().flat_map(|(idx, cur)| cur.get_key(&storage[idx]).map(|key| (idx, key)));
44        if let Some((idx, key)) = iter.next() {
45            let mut min_key = key;
46            self.min_key.push(idx);
47            for (idx, key) in iter {
48                match key.cmp(&min_key) {
49                    std::cmp::Ordering::Less => {
50                        self.min_key.clear();
51                        self.min_key.push(idx);
52                        min_key = key;
53                    }
54                    std::cmp::Ordering::Equal => {
55                        self.min_key.push(idx);
56                    }
57                    std::cmp::Ordering::Greater => { }
58                }
59            }
60        }
61
62        self.minimize_vals(storage);
63    }
64
65    /// Initialize min_val with the indices of minimum key cursors with the minimum value.
66    ///
67    /// This method scans the current values of cursor with minimum keys, and tracks the
68    /// indices of cursors whose value equals the minimum valid value seen so far. As it
69    /// goes, if it observes an improved value it clears the current list, updates the minimum
70    /// value, and continues.
71    fn minimize_vals(&mut self, storage: &[C::Storage]) {
72
73        self.min_val.clear();
74
75        // We'll visit each non-`None` value, maintaining the indexes of the least values in `self.min_val`.
76        let mut iter = self.min_key.iter().cloned().flat_map(|idx| self.cursors[idx].get_val(&storage[idx]).map(|val| (idx, val)));
77        if let Some((idx, val)) = iter.next() {
78            let mut min_val = val;
79            self.min_val.push(idx);
80            for (idx, val) in iter {
81                match val.cmp(&min_val) {
82                    std::cmp::Ordering::Less => {
83                        self.min_val.clear();
84                        self.min_val.push(idx);
85                        min_val = val;
86                    }
87                    std::cmp::Ordering::Equal => {
88                        self.min_val.push(idx);
89                    }
90                    std::cmp::Ordering::Greater => { }
91                }
92            }
93        }
94    }
95}
96
97use crate::trace::implementations::WithLayout;
98impl<C: Cursor> WithLayout for CursorList<C> {
99    type Layout = C::Layout;
100}
101
102impl<C: Cursor> Cursor for CursorList<C> {
103
104    type Storage = Vec<C::Storage>;
105
106    // validation methods
107    #[inline]
108    fn key_valid(&self, _storage: &Vec<C::Storage>) -> bool { !self.min_key.is_empty() }
109    #[inline]
110    fn val_valid(&self, _storage: &Vec<C::Storage>) -> bool { !self.min_val.is_empty() }
111
112    // accessors
113    #[inline]
114    fn key<'a>(&self, storage: &'a Vec<C::Storage>) -> Self::Key<'a> {
115        debug_assert!(self.key_valid(storage));
116        debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]]));
117        self.cursors[self.min_key[0]].key(&storage[self.min_key[0]])
118    }
119    #[inline]
120    fn val<'a>(&self, storage: &'a Vec<C::Storage>) -> Self::Val<'a> {
121        debug_assert!(self.key_valid(storage));
122        debug_assert!(self.val_valid(storage));
123        debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]]));
124        self.cursors[self.min_val[0]].val(&storage[self.min_val[0]])
125    }
126    #[inline]
127    fn get_key<'a>(&self, storage: &'a Vec<C::Storage>) -> Option<Self::Key<'a>> {
128        self.min_key.get(0).map(|idx| self.cursors[*idx].key(&storage[*idx]))
129    }
130    #[inline]
131    fn get_val<'a>(&self, storage: &'a Vec<C::Storage>) -> Option<Self::Val<'a>> {
132        self.min_val.get(0).map(|idx| self.cursors[*idx].val(&storage[*idx]))
133    }
134
135    #[inline]
136    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
137        for &index in self.min_val.iter() {
138            self.cursors[index].map_times(&storage[index], |t,d| logic(t,d));
139        }
140    }
141
142    // key methods
143    #[inline]
144    fn step_key(&mut self, storage: &Vec<C::Storage>) {
145        for &index in self.min_key.iter() {
146            self.cursors[index].step_key(&storage[index]);
147        }
148        self.minimize_keys(storage);
149    }
150    #[inline]
151    fn seek_key(&mut self, storage: &Vec<C::Storage>, key: Self::Key<'_>) {
152        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
153            cursor.seek_key(storage, key);
154        }
155        self.minimize_keys(storage);
156    }
157
158    // value methods
159    #[inline]
160    fn step_val(&mut self, storage: &Vec<C::Storage>) {
161        for &index in self.min_val.iter() {
162            self.cursors[index].step_val(&storage[index]);
163        }
164        self.minimize_vals(storage);
165    }
166    #[inline]
167    fn seek_val(&mut self, storage: &Vec<C::Storage>, val: Self::Val<'_>) {
168        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
169            cursor.seek_val(storage, val);
170        }
171        self.minimize_vals(storage);
172    }
173
174    // rewinding methods
175    #[inline]
176    fn rewind_keys(&mut self, storage: &Vec<C::Storage>) {
177        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
178            cursor.rewind_keys(storage);
179        }
180        self.minimize_keys(storage);
181    }
182    #[inline]
183    fn rewind_vals(&mut self, storage: &Vec<C::Storage>) {
184        for &index in self.min_key.iter() {
185            self.cursors[index].rewind_vals(&storage[index]);
186        }
187        self.minimize_vals(storage);
188    }
189}