mz_compute/compute_state/
peek_result_iterator.rs1use std::iter::FusedIterator;
9use std::num::NonZeroI64;
10use std::ops::Range;
11
12use differential_dataflow::trace::implementations::BatchContainer;
13use differential_dataflow::trace::{Cursor, TraceReader};
14use mz_ore::result::ResultExt;
15use mz_repr::fixed_length::ToDatumIter;
16use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena};
17use timely::order::PartialOrder;
18
19pub struct PeekResultIterator<Tr>
20where
21 Tr: TraceReader,
22{
23 target_id: GlobalId,
25 cursor: Tr::Cursor,
26 storage: Tr::Storage,
27 map_filter_project: mz_expr::SafeMfpPlan,
28 peek_timestamp: mz_repr::Timestamp,
29 row_builder: Row,
30 datum_vec: DatumVec,
31 literals: Option<Literals<Tr>>,
32}
33
34struct Literals<Tr: TraceReader> {
36 literals: Tr::KeyContainer,
38 range: Range<usize>,
40 current_index: Option<usize>,
42}
43
44impl<Tr: TraceReader<KeyOwn: Ord>> Literals<Tr> {
45 fn new(literals: &mut [Tr::KeyOwn], cursor: &mut Tr::Cursor, storage: &Tr::Storage) -> Self {
47 literals.sort();
50 let mut container = Tr::KeyContainer::with_capacity(literals.len());
51 for constraint in literals {
52 container.push_own(constraint)
53 }
54 let range = 0..container.len();
55 let mut this = Self {
56 literals: container,
57 range,
58 current_index: None,
59 };
60 this.seek_next_literal_key(cursor, storage);
61 this
62 }
63
64 fn peek(&self) -> Option<Tr::Key<'_>> {
66 self.current_index
67 .and_then(|index| self.literals.get(index))
68 }
69
70 fn is_exhausted(&self) -> bool {
72 self.current_index.is_none()
73 }
74
75 fn seek_next_literal_key(&mut self, cursor: &mut Tr::Cursor, storage: &Tr::Storage) {
77 while let Some(index) = self.range.next() {
78 let literal = self.literals.get(index).expect("index out of bounds");
79 cursor.seek_key(storage, literal);
80 if cursor.get_key(storage).map_or(true, |key| key == literal) {
81 self.current_index = Some(index);
82 return;
83 }
84 }
88 self.current_index = None;
89 }
90}
91
92impl<Tr> PeekResultIterator<Tr>
97where
98 for<'a> Tr: TraceReader<
99 Key<'a>: ToDatumIter + Eq,
100 KeyOwn = Row,
101 Val<'a>: ToDatumIter,
102 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
103 DiffGat<'a> = &'a Diff,
104 >,
105{
106 pub fn new(
107 target_id: GlobalId,
108 map_filter_project: mz_expr::SafeMfpPlan,
109 peek_timestamp: mz_repr::Timestamp,
110 literal_constraints: Option<&mut [Row]>,
111 trace_reader: &mut Tr,
112 ) -> Self {
113 let (mut cursor, storage) = trace_reader.cursor();
114 let literals = literal_constraints
115 .map(|constraints| Literals::new(constraints, &mut cursor, &storage));
116
117 Self {
118 target_id,
119 cursor,
120 storage,
121 map_filter_project,
122 peek_timestamp,
123 row_builder: Row::default(),
124 datum_vec: DatumVec::new(),
125 literals,
126 }
127 }
128
129 fn literals_exhausted(&self) -> bool {
131 self.literals.as_ref().map_or(false, Literals::is_exhausted)
132 }
133}
134
135impl<Tr> FusedIterator for PeekResultIterator<Tr> where
136 for<'a> Tr: TraceReader<
137 Key<'a>: ToDatumIter + Eq,
138 KeyOwn = Row,
139 Val<'a>: ToDatumIter,
140 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
141 DiffGat<'a> = &'a Diff,
142 >
143{
144}
145
146impl<Tr> Iterator for PeekResultIterator<Tr>
147where
148 for<'a> Tr: TraceReader<
149 Key<'a>: ToDatumIter + Eq,
150 KeyOwn = Row,
151 Val<'a>: ToDatumIter,
152 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
153 DiffGat<'a> = &'a Diff,
154 >,
155{
156 type Item = Result<(Row, NonZeroI64), String>;
157
158 fn next(&mut self) -> Option<Self::Item> {
159 let result = loop {
160 if self.literals_exhausted() {
161 return None;
162 }
163
164 if !self.cursor.key_valid(&self.storage) {
165 return None;
166 }
167
168 if !self.cursor.val_valid(&self.storage) {
169 let exhausted = self.step_key();
170 if exhausted {
171 return None;
172 }
173 }
174
175 match self.extract_current_row() {
176 Ok(Some(row)) => break Ok(row),
177 Ok(None) => {
178 self.cursor.step_val(&self.storage);
180 }
181 Err(err) => break Err(err),
182 }
183 };
184
185 self.cursor.step_val(&self.storage);
186
187 Some(result)
188 }
189}
190
191impl<Tr> PeekResultIterator<Tr>
192where
193 for<'a> Tr: TraceReader<
194 Key<'a>: ToDatumIter + Eq,
195 KeyOwn = Row,
196 Val<'a>: ToDatumIter,
197 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
198 DiffGat<'a> = &'a Diff,
199 >,
200{
201 fn extract_current_row(&mut self) -> Result<Option<(Row, NonZeroI64)>, String> {
205 let arena = RowArena::new();
211
212 let key_item = self.cursor.key(&self.storage);
213 let key = key_item.to_datum_iter();
214 let row_item = self.cursor.val(&self.storage);
215 let row = row_item.to_datum_iter();
216
217 let maybe_literal;
220 let mut borrow = self.datum_vec.borrow();
221 borrow.extend(key);
222 borrow.extend(row);
223
224 if let Some(literals) = &mut self.literals
225 && let Some(literal) = literals.peek()
226 {
227 maybe_literal = literal;
230 borrow.extend(maybe_literal.to_datum_iter());
231 }
232 if let Some(result) = self
233 .map_filter_project
234 .evaluate_into(&mut borrow, &arena, &mut self.row_builder)
235 .map(|row| row.cloned())
236 .map_err_to_string_with_causes()?
237 {
238 let mut copies = Diff::ZERO;
239 self.cursor.map_times(&self.storage, |time, diff| {
240 if time.less_equal(&self.peek_timestamp) {
241 copies += diff;
242 }
243 });
244 let copies: i64 = if copies.is_negative() {
245 let row = &*borrow;
246 tracing::error!(
247 target = %self.target_id, diff = %copies, ?row,
248 "index peek encountered negative multiplicities in ok trace",
249 );
250 return Err(format!(
251 "Invalid data in source, \
252 saw retractions ({}) for row that does not exist: {:?}",
253 -copies, row,
254 ));
255 } else {
256 copies.into_inner()
257 };
258 if let Some(copies) = NonZeroI64::new(copies) {
260 Ok(Some((result, copies)))
261 } else {
262 Ok(None)
263 }
264 } else {
265 Ok(None)
266 }
267 }
268
269 fn step_key(&mut self) -> bool {
273 assert!(
274 !self.cursor.val_valid(&self.storage),
275 "must only step key when the vals for a key are exhausted"
276 );
277
278 if let Some(literals) = &mut self.literals {
279 literals.seek_next_literal_key(&mut self.cursor, &self.storage);
280
281 if literals.is_exhausted() {
282 return true;
283 }
284 } else {
285 self.cursor.step_key(&self.storage);
286 }
287
288 if !self.cursor.key_valid(&self.storage) {
289 return true;
291 }
292
293 assert!(
294 self.cursor.val_valid(&self.storage),
295 "there must always be at least one val per key"
296 );
297
298 false
299 }
300}