differential_dataflow/trace/cursor/
cursor_list.rs

1//! A generic cursor implementation merging multiple cursors.
2
3use super::Cursor;
4
5/// Provides a cursor interface over a list of cursors.
6///
7/// The `CursorList` tracks the indices of cursors with the minimum key, and the the indices of cursors with
8/// the minimum key and minimum value. It performs no clever management of these sets otherwise.
9#[derive(Debug)]
10pub struct CursorList<C> {
11    cursors: Vec<C>,
12    min_key: Vec<usize>,
13    min_val: Vec<usize>,
14}
15
16impl<C: Cursor> CursorList<C> {
17    /// Creates a new cursor list from pre-existing cursors.
18    pub fn new(cursors: Vec<C>, storage: &[C::Storage]) -> Self  {
19        let mut result = CursorList {
20            cursors,
21            min_key: Vec::new(),
22            min_val: Vec::new(),
23        };
24
25        result.minimize_keys(storage);
26        result
27    }
28
29    // Initialize min_key with the indices of cursors with the minimum key.
30    //
31    // This method scans the current keys of each cursor, and tracks the indices
32    // of cursors whose key equals the minimum valid key seen so far. As it goes,
33    // if it observes an improved key it clears the current list, updates the
34    // minimum key, and continues.
35    //
36    // Once finished, it invokes `minimize_vals()` to ensure the value cursor is
37    // in a consistent state as well.
38    fn minimize_keys(&mut self, storage: &[C::Storage]) {
39
40        self.min_key.clear();
41
42        // Determine the index of the cursor with minimum key.
43        let mut min_key_opt = None;
44        for (index, cursor) in self.cursors.iter().enumerate() {
45            let key = cursor.get_key(&storage[index]);
46            if key.is_some() {
47                if min_key_opt.is_none() || key.lt(&min_key_opt) {
48                    min_key_opt = key;
49                    self.min_key.clear();
50                }
51                if key.eq(&min_key_opt) {
52                    self.min_key.push(index);
53                }
54            }
55        }
56
57        self.minimize_vals(storage);
58    }
59
60    // Initialize min_val with the indices of minimum key cursors with the minimum value.
61    //
62    // This method scans the current values of cursor with minimum keys, and tracks the
63    // indices of cursors whose value equals the minimum valid value seen so far. As it
64    // goes, if it observes an improved value it clears the current list, updates the minimum
65    // value, and continues.
66    fn minimize_vals(&mut self, storage: &[C::Storage]) {
67
68        self.min_val.clear();
69
70        // Determine the index of the cursor with minimum value.
71        let mut min_val = None;
72        for &index in self.min_key.iter() {
73            let val = self.cursors[index].get_val(&storage[index]);
74            if val.is_some() {
75                if min_val.is_none() || val.lt(&min_val) {
76                    min_val = val;
77                    self.min_val.clear();
78                }
79                if val.eq(&min_val) {
80                    self.min_val.push(index);
81                }
82            }
83        }
84    }
85}
86
87impl<C: Cursor> Cursor for CursorList<C> {
88    type Key<'a> = C::Key<'a>;
89    type Val<'a> = C::Val<'a>;
90    type Time = C::Time;
91    type TimeGat<'a> = C::TimeGat<'a>;
92    type Diff = C::Diff;
93    type DiffGat<'a> = C::DiffGat<'a>;
94
95    type Storage = Vec<C::Storage>;
96
97    // validation methods
98    #[inline]
99    fn key_valid(&self, _storage: &Vec<C::Storage>) -> bool { !self.min_key.is_empty() }
100    #[inline]
101    fn val_valid(&self, _storage: &Vec<C::Storage>) -> bool { !self.min_val.is_empty() }
102
103    // accessors
104    #[inline]
105    fn key<'a>(&self, storage: &'a Vec<C::Storage>) -> Self::Key<'a> {
106        debug_assert!(self.key_valid(storage));
107        debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]]));
108        self.cursors[self.min_key[0]].key(&storage[self.min_key[0]])
109    }
110    #[inline]
111    fn val<'a>(&self, storage: &'a Vec<C::Storage>) -> Self::Val<'a> {
112        debug_assert!(self.key_valid(storage));
113        debug_assert!(self.val_valid(storage));
114        debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]]));
115        self.cursors[self.min_val[0]].val(&storage[self.min_val[0]])
116    }
117    #[inline]
118    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
119        for &index in self.min_val.iter() {
120            self.cursors[index].map_times(&storage[index], |t,d| logic(t,d));
121        }
122    }
123
124    // key methods
125    #[inline]
126    fn step_key(&mut self, storage: &Vec<C::Storage>) {
127        for &index in self.min_key.iter() {
128            self.cursors[index].step_key(&storage[index]);
129        }
130        self.minimize_keys(storage);
131    }
132    #[inline]
133    fn seek_key(&mut self, storage: &Vec<C::Storage>, key: Self::Key<'_>) {
134        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
135            cursor.seek_key(storage, key);
136        }
137        self.minimize_keys(storage);
138    }
139
140    // value methods
141    #[inline]
142    fn step_val(&mut self, storage: &Vec<C::Storage>) {
143        for &index in self.min_val.iter() {
144            self.cursors[index].step_val(&storage[index]);
145        }
146        self.minimize_vals(storage);
147    }
148    #[inline]
149    fn seek_val(&mut self, storage: &Vec<C::Storage>, val: Self::Val<'_>) {
150        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
151            cursor.seek_val(storage, val);
152        }
153        self.minimize_vals(storage);
154    }
155
156    // rewinding methods
157    #[inline]
158    fn rewind_keys(&mut self, storage: &Vec<C::Storage>) {
159        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
160            cursor.rewind_keys(storage);
161        }
162        self.minimize_keys(storage);
163    }
164    #[inline]
165    fn rewind_vals(&mut self, storage: &Vec<C::Storage>) {
166        for &index in self.min_key.iter() {
167            self.cursors[index].rewind_vals(&storage[index]);
168        }
169        self.minimize_vals(storage);
170    }
171}